diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java index 37fec43648e4a..688bd68249330 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java @@ -33,6 +33,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp; import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq; @@ -519,6 +521,84 @@ public void testGetSlots() throws Exception { } } + @Test + public void testGetRegionGroupsByTime() throws Exception { + final String sg0 = "root.sg0"; + + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + + // Get all region groups covering the full time range + TGetRegionGroupsByTimeReq req = new TGetRegionGroupsByTimeReq(sg0, 0L, Long.MAX_VALUE); + TGetRegionGroupsByTimeResp resp = client.getRegionGroupsByTime(req); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode()); + Assert.assertNotNull(resp.getRegionReplicaSets()); + Assert.assertFalse(resp.getRegionReplicaSets().isEmpty()); + int allRegionGroupCount = resp.getRegionReplicaSetsSize(); + + // Each replica set should have testReplicationFactor replicas + resp.getRegionReplicaSets() + .forEach( + replicaSet -> { + Assert.assertEquals(testReplicationFactor, replicaSet.getDataNodeLocationsSize()); + Assert.assertEquals( + TConsensusGroupType.DataRegion, replicaSet.getRegionId().getType()); + }); + + // Query with a single time slot range should return a subset + TGetRegionGroupsByTimeReq singleSlotReq = + new TGetRegionGroupsByTimeReq(sg0, 0L, testTimePartitionInterval - 1); + TGetRegionGroupsByTimeResp singleSlotResp = client.getRegionGroupsByTime(singleSlotReq); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), singleSlotResp.getStatus().getCode()); + Assert.assertNotNull(singleSlotResp.getRegionReplicaSets()); + Assert.assertFalse(singleSlotResp.getRegionReplicaSets().isEmpty()); + Assert.assertTrue(singleSlotResp.getRegionReplicaSetsSize() <= allRegionGroupCount); + + // Query with a disjoint time range should return empty result + TGetRegionGroupsByTimeReq disjointReq = + new TGetRegionGroupsByTimeReq( + sg0, + testTimePartitionSlotsNum * testTimePartitionInterval * 2, + testTimePartitionSlotsNum * testTimePartitionInterval * 3); + TGetRegionGroupsByTimeResp disjointResp = client.getRegionGroupsByTime(disjointReq); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), disjointResp.getStatus().getCode()); + Assert.assertTrue( + disjointResp.getRegionReplicaSets() == null + || disjointResp.getRegionReplicaSets().isEmpty()); + + // Query non-existent database should return empty result + TGetRegionGroupsByTimeReq nonExistReq = + new TGetRegionGroupsByTimeReq("root.nonexistent", 0L, Long.MAX_VALUE); + TGetRegionGroupsByTimeResp nonExistResp = client.getRegionGroupsByTime(nonExistReq); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), nonExistResp.getStatus().getCode()); + Assert.assertTrue( + nonExistResp.getRegionReplicaSets() == null + || nonExistResp.getRegionReplicaSets().isEmpty()); + + // Verify consistency: union of per-slot queries should equal full-range query + Set unionRegionIds = new HashSet<>(); + for (long t = 0; t < testTimePartitionSlotsNum; t++) { + TGetRegionGroupsByTimeReq perSlotReq = + new TGetRegionGroupsByTimeReq( + sg0, t * testTimePartitionInterval, t * testTimePartitionInterval); + TGetRegionGroupsByTimeResp perSlotResp = client.getRegionGroupsByTime(perSlotReq); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), perSlotResp.getStatus().getCode()); + if (perSlotResp.getRegionReplicaSets() != null) { + perSlotResp + .getRegionReplicaSets() + .forEach(replicaSet -> unionRegionIds.add(replicaSet.getRegionId())); + } + } + Set allRegionIds = new HashSet<>(); + resp.getRegionReplicaSets().forEach(replicaSet -> allRegionIds.add(replicaSet.getRegionId())); + Assert.assertEquals(allRegionIds, unionRegionIds); + } + } + @Test public void testGetSchemaNodeManagementPartition() throws Exception { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java index ffe333b56dd78..275a0a502616d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException; import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan; +import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan; import org.apache.iotdb.confignode.consensus.request.read.subscription.ShowTopicPlan; import org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan; import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan; @@ -248,6 +249,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept case RemoveRegionLocation: plan = new RemoveRegionLocationPlan(); break; + case GetRegionGroupsByTime: + plan = new GetRegionGroupsByTimePlan(); + break; case OfferRegionMaintainTasks: plan = new OfferRegionMaintainTasksPlan(); break; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java index fe04b93d9ad4b..3eee7caf37a6f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java @@ -71,6 +71,7 @@ public enum ConfigPhysicalPlanType { CountTimeSlotList((short) 310), AddRegionLocation((short) 311), RemoveRegionLocation((short) 312), + GetRegionGroupsByTime((short) 313), /** Partition. */ GetSchemaPartition((short) 400), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/region/GetRegionGroupsByTimePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/region/GetRegionGroupsByTimePlan.java new file mode 100644 index 0000000000000..d946022771da5 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/region/GetRegionGroupsByTimePlan.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.consensus.request.read.region; + +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil; +import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; +import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class GetRegionGroupsByTimePlan extends ConfigPhysicalReadPlan { + + private String database; + + private TTimePartitionSlot startTimeSlot; + + private TTimePartitionSlot endTimeSlot; + + public GetRegionGroupsByTimePlan() { + super(ConfigPhysicalPlanType.GetRegionGroupsByTime); + } + + public GetRegionGroupsByTimePlan( + final String database, final long startTime, final long endTime) { + super(ConfigPhysicalPlanType.GetRegionGroupsByTime); + this.database = database; + this.startTimeSlot = TimePartitionUtils.getTimePartitionSlot(startTime); + this.endTimeSlot = TimePartitionUtils.getTimePartitionSlot(endTime); + } + + public String getDatabase() { + return database; + } + + public TTimePartitionSlot getStartTimeSlot() { + return startTimeSlot; + } + + public TTimePartitionSlot getEndTimeSlot() { + return endTimeSlot; + } + + @Override + protected void serializeImpl(final DataOutputStream stream) throws IOException { + stream.writeShort(getType().getPlanType()); + BasicStructureSerDeUtil.write(database, stream); + stream.writeLong(startTimeSlot.getStartTime()); + stream.writeLong(endTimeSlot.getStartTime()); + } + + @Override + protected void deserializeImpl(final ByteBuffer buffer) throws IOException { + database = BasicStructureSerDeUtil.readString(buffer); + startTimeSlot = new TTimePartitionSlot(buffer.getLong()); + endTimeSlot = new TTimePartitionSlot(buffer.getLong()); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final GetRegionGroupsByTimePlan that = (GetRegionGroupsByTimePlan) o; + return Objects.equals(database, that.database) + && Objects.equals(startTimeSlot, that.startTimeSlot) + && Objects.equals(endTimeSlot, that.endTimeSlot); + } + + @Override + public int hashCode() { + return Objects.hash(database, startTimeSlot, endTimeSlot); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/partition/GetRegionGroupsByTimeResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/partition/GetRegionGroupsByTimeResp.java new file mode 100644 index 0000000000000..3166ec30df14f --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/partition/GetRegionGroupsByTimeResp.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.consensus.response.partition; + +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp; +import org.apache.iotdb.consensus.common.DataSet; +import org.apache.iotdb.rpc.TSStatusCode; + +import java.util.Set; + +public class GetRegionGroupsByTimeResp implements DataSet { + + private final TSStatus status; + + private final Set regionReplicaSets; + + public GetRegionGroupsByTimeResp( + final TSStatus status, final Set regionReplicaSets) { + this.status = status; + this.regionReplicaSets = regionReplicaSets; + } + + public TSStatus getStatus() { + return status; + } + + public TGetRegionGroupsByTimeResp convertToRpcResp() { + TGetRegionGroupsByTimeResp resp = new TGetRegionGroupsByTimeResp(); + resp.setStatus(status); + + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + resp.setRegionReplicaSets(regionReplicaSets); + } + + return resp; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 182dc2f9fb249..b13acd8d95889 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -199,6 +199,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq; import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp; import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq; @@ -2569,6 +2571,14 @@ public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) { : new TGetSeriesSlotListResp(status); } + @Override + public TGetRegionGroupsByTimeResp getRegionGroupsByTime(TGetRegionGroupsByTimeReq req) { + TSStatus status = confirmLeader(); + return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + ? partitionManager.getRegionGroupsByTime(req).convertToRpcResp() + : new TGetRegionGroupsByTimeResp(status); + } + @Override public TSStatus migrateRegion(TMigrateRegionReq req) { TSStatus status = confirmLeader(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index 02c82164595df..50601dff9ac57 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -118,6 +118,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq; import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp; import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq; @@ -854,6 +856,13 @@ TPermissionInfoResp login( */ TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req); + /** + * Get DataRegion groups that overlap a time range for the given database. + * + * @return TGetRegionGroupsByTimeResp. + */ + TGetRegionGroupsByTimeResp getRegionGroupsByTime(TGetRegionGroupsByTimeReq req); + TSStatus migrateRegion(TMigrateRegionReq req); TSStatus reconstructRegion(TReconstructRegionReq req); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 576d805c78624..607d670006cc0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -49,6 +49,7 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan; import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan; import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan; +import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; @@ -60,6 +61,7 @@ import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan; import org.apache.iotdb.confignode.consensus.response.partition.CountTimeSlotListResp; import org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp; +import org.apache.iotdb.confignode.consensus.response.partition.GetRegionGroupsByTimeResp; import org.apache.iotdb.confignode.consensus.response.partition.GetRegionIdResp; import org.apache.iotdb.confignode.consensus.response.partition.GetSeriesSlotListResp; import org.apache.iotdb.confignode.consensus.response.partition.GetTimeSlotListResp; @@ -82,6 +84,7 @@ import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType; import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; @@ -1180,6 +1183,19 @@ public GetRegionIdResp getRegionId(final TGetRegionIdReq req) { } } + public GetRegionGroupsByTimeResp getRegionGroupsByTime(final TGetRegionGroupsByTimeReq req) { + final GetRegionGroupsByTimePlan plan = + new GetRegionGroupsByTimePlan(req.getDatabase(), req.getStartTime(), req.getEndTime()); + try { + return (GetRegionGroupsByTimeResp) getConsensusManager().read(plan); + } catch (final ConsensusException e) { + LOGGER.warn(CONSENSUS_READ_ERROR, e); + final TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return new GetRegionGroupsByTimeResp(res, Collections.emptySet()); + } + } + public GetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) { long startTime = req.isSetStartTime() ? req.getStartTime() : Long.MIN_VALUE; long endTime = req.isSetEndTime() ? req.getEndTime() : Long.MAX_VALUE; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index 6c9351e881aa8..7253eae3279ad 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -43,6 +43,7 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan; import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan; import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan; +import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; import org.apache.iotdb.confignode.consensus.request.read.table.DescTablePlan; @@ -355,6 +356,8 @@ public DataSet executeQueryPlan(final ConfigPhysicalReadPlan req) return partitionInfo.countTimeSlotList((CountTimeSlotListPlan) req); case GetSeriesSlotList: return partitionInfo.getSeriesSlotList((GetSeriesSlotListPlan) req); + case GetRegionGroupsByTime: + return partitionInfo.getRegionGroupsByTime((GetRegionGroupsByTimePlan) req); case SHOW_CQ: return cqInfo.showCQ(); case ShowExternalService: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index 1d2d776c69ba8..5a80364f0331a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -501,6 +501,17 @@ public List getRegionId( } } + public Set getRegionGroupsByTime( + TTimePartitionSlot startTimeSlot, TTimePartitionSlot endTimeSlot) { + List regionIds = + dataPartitionTable.getRegionId(new TSeriesPartitionSlot(-1), startTimeSlot, endTimeSlot); + return regionIds.stream() + .distinct() + .filter(regionGroupMap::containsKey) + .map(id -> regionGroupMap.get(id).getReplicaSet()) + .collect(Collectors.toSet()); + } + public List getTimeSlotList( TSeriesPartitionSlot seriesSlotId, TConsensusGroupId regionId, long startTime, long endTime) { return dataPartitionTable.getTimeSlotList(seriesSlotId, regionId, startTime, endTime); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index b907527416bda..5c2c93daeabed 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan; import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan; import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan; +import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; @@ -53,6 +54,7 @@ import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan; import org.apache.iotdb.confignode.consensus.response.partition.CountTimeSlotListResp; import org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp; +import org.apache.iotdb.confignode.consensus.response.partition.GetRegionGroupsByTimeResp; import org.apache.iotdb.confignode.consensus.response.partition.GetRegionIdResp; import org.apache.iotdb.confignode.consensus.response.partition.GetSeriesSlotListResp; import org.apache.iotdb.confignode.consensus.response.partition.GetTimeSlotListResp; @@ -1105,6 +1107,18 @@ public DataSet getRegionId(GetRegionIdPlan plan) { .collect(Collectors.toList())); } + public DataSet getRegionGroupsByTime(GetRegionGroupsByTimePlan plan) { + if (!isDatabaseExisted(plan.getDatabase())) { + return new GetRegionGroupsByTimeResp( + new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), new HashSet<>()); + } + DatabasePartitionTable databasePartitionTable = databasePartitionTables.get(plan.getDatabase()); + return new GetRegionGroupsByTimeResp( + new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), + databasePartitionTable.getRegionGroupsByTime( + plan.getStartTimeSlot(), plan.getEndTimeSlot())); + } + /** * Get the timePartition of the specific Database or seriesSlotId(device) or regionId. * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 4d01f3770c218..52f330bcdce5c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -167,6 +167,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq; import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp; import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq; @@ -1333,6 +1335,11 @@ public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) { return configManager.getSeriesSlotList(req); } + @Override + public TGetRegionGroupsByTimeResp getRegionGroupsByTime(TGetRegionGroupsByTimeReq req) { + return configManager.getRegionGroupsByTime(req); + } + @Override public TSStatus migrateRegion(TMigrateRegionReq req) { return configManager.migrateRegion(req); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index f203bd5d77f33..5ff3fb1f33bc3 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -66,6 +66,7 @@ import org.apache.iotdb.commons.udf.UDFInformation; import org.apache.iotdb.commons.udf.UDFType; import org.apache.iotdb.commons.utils.TimePartitionUtils; +import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionGroupsByTimePlan; import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan; import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorRelationalPlan; import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorTreePlan; @@ -2062,4 +2063,13 @@ public void removeRegionLocationPlanTest() throws IOException { (RemoveRegionLocationPlan) ConfigPhysicalPlan.Factory.create(plan.serializeToByteBuffer()); Assert.assertEquals(plan, dePlan); } + + @Test + public void GetRegionGroupsByTimePlanTest() throws IOException { + GetRegionGroupsByTimePlan plan0 = new GetRegionGroupsByTimePlan("root.sg0", 0L, 604800000L); + GetRegionGroupsByTimePlan plan1 = + (GetRegionGroupsByTimePlan) + ConfigPhysicalPlan.Factory.create(plan0.serializeToByteBuffer()); + Assert.assertEquals(plan0, plan1); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java index e2c04caedfb20..e0001c448bbad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java @@ -125,6 +125,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesReq; import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdResp; import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq; @@ -1302,6 +1304,13 @@ public TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) throw () -> client.getSeriesSlotList(req), resp -> !updateConfigNodeLeader(resp.status)); } + @Override + public TGetRegionGroupsByTimeResp getRegionGroupsByTime(TGetRegionGroupsByTimeReq req) + throws TException { + return executeRemoteCallWithRetry( + () -> client.getRegionGroupsByTime(req), resp -> !updateConfigNodeLeader(resp.status)); + } + @Override public TSStatus migrateRegion(TMigrateRegionReq req) throws TException { return executeRemoteCallWithRetry( diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 92312ee81a307..b1af203f4a13a 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -313,6 +313,17 @@ struct TCountTimeSlotListResp { 2: optional i64 count } +struct TGetRegionGroupsByTimeReq { + 1: required string database + 2: required i64 startTime + 3: required i64 endTime +} + +struct TGetRegionGroupsByTimeResp { + 1: required common.TSStatus status + 2: optional set regionReplicaSets +} + struct TGetSeriesSlotListReq { 1: required string database 2: required common.TConsensusGroupType type @@ -1971,6 +1982,9 @@ service IConfigNodeRPCService { /** Get the given database's assigned SeriesSlots */ TGetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) + /** Get a database's DataRegion groups that overlap a time range */ + TGetRegionGroupsByTimeResp getRegionGroupsByTime(TGetRegionGroupsByTimeReq req) + // ==================================================== // CQ // ====================================================