Skip to content
Permalink
Browse files
[IOTDB-3428] Linear expanse RegionGroup(simple version) (#6325)
  • Loading branch information
CRZbulabula committed Jun 23, 2022
1 parent cb283d6 commit a2436515e0820e1c8304595e88a0280fb89fbe23
Showing 19 changed files with 636 additions and 241 deletions.
@@ -64,10 +64,10 @@ private AsyncDataNodeClientPool() {
/**
* Execute CreateRegionsReq asynchronously
*
* @param createRegionsReq CreateRegionsReq
* @param createRegionGroupsReq CreateRegionsReq
* @param ttlMap Map<StorageGroupName, TTL>
*/
public void createRegions(CreateRegionsReq createRegionsReq, Map<String, Long> ttlMap) {
public void createRegions(CreateRegionsReq createRegionGroupsReq, Map<String, Long> ttlMap) {

// Index of each Region
int index = 0;
@@ -78,7 +78,7 @@ public void createRegions(CreateRegionsReq createRegionsReq, Map<String, Long> t

// Assign an independent index to each Region
for (Map.Entry<String, List<TRegionReplicaSet>> entry :
createRegionsReq.getRegionMap().entrySet()) {
createRegionGroupsReq.getRegionGroupMap().entrySet()) {
for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
regionNum += regionReplicaSet.getDataNodeLocationsSize();
for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
@@ -93,8 +93,8 @@ public void createRegions(CreateRegionsReq createRegionsReq, Map<String, Long> t
BitSet bitSet = new BitSet(regionNum);
for (int retry = 0; retry < 3; retry++) {
CountDownLatch latch = new CountDownLatch(regionNum - bitSet.cardinality());
createRegionsReq
.getRegionMap()
createRegionGroupsReq
.getRegionGroupMap()
.forEach(
(storageGroup, regionReplicaSets) -> {
// Enumerate each RegionReplicaSet
@@ -28,6 +28,7 @@
import org.apache.iotdb.confignode.consensus.request.read.GetRegionLocationsReq;
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.read.GetStorageGroupReq;
import org.apache.iotdb.confignode.consensus.request.write.AdjustMaxRegionGroupCountReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateFunctionReq;
@@ -118,13 +119,16 @@ public static ConfigRequest create(ByteBuffer buffer) throws IOException {
case SetTimePartitionInterval:
req = new SetTimePartitionIntervalReq();
break;
case AdjustMaxRegionGroupCount:
req = new AdjustMaxRegionGroupCountReq();
break;
case CountStorageGroup:
req = new CountStorageGroupReq();
break;
case GetStorageGroup:
req = new GetStorageGroupReq();
break;
case CreateRegions:
case CreateRegionGroups:
req = new CreateRegionsReq();
break;
case DeleteRegions:
@@ -26,11 +26,12 @@ public enum ConfigRequestType {
SetSchemaReplicationFactor,
SetDataReplicationFactor,
SetTimePartitionInterval,
AdjustMaxRegionGroupCount,
DeleteStorageGroup,
PreDeleteStorageGroup,
GetStorageGroup,
CountStorageGroup,
CreateRegions,
CreateRegionGroups,
DeleteRegions,
GetSchemaPartition,
CreateSchemaPartition,
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.consensus.request.write;

import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class AdjustMaxRegionGroupCountReq extends ConfigRequest {

// Map<StorageGroupName, Pair<maxSchemaRegionGroupCount, maxDataRegionGroupCount>>
public final Map<String, Pair<Integer, Integer>> maxRegionGroupCountMap;

public AdjustMaxRegionGroupCountReq() {
super(ConfigRequestType.AdjustMaxRegionGroupCount);
this.maxRegionGroupCountMap = new HashMap<>();
}

public void putEntry(String storageGroup, Pair<Integer, Integer> maxRegionGroupCount) {
maxRegionGroupCountMap.put(storageGroup, maxRegionGroupCount);
}

public Map<String, Pair<Integer, Integer>> getMaxRegionGroupCountMap() {
return maxRegionGroupCountMap;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(ConfigRequestType.AdjustMaxRegionGroupCount.ordinal(), stream);

ReadWriteIOUtils.write(maxRegionGroupCountMap.size(), stream);
for (Map.Entry<String, Pair<Integer, Integer>> maxRegionGroupCountEntry :
maxRegionGroupCountMap.entrySet()) {
ReadWriteIOUtils.write(maxRegionGroupCountEntry.getKey(), stream);
ReadWriteIOUtils.write(maxRegionGroupCountEntry.getValue().getLeft(), stream);
ReadWriteIOUtils.write(maxRegionGroupCountEntry.getValue().getRight(), stream);
}
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
int storageGroupNum = buffer.getInt();

for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = ReadWriteIOUtils.readString(buffer);
int maxSchemaRegionGroupCount = buffer.getInt();
int maxDataRegionGroupCount = buffer.getInt();
maxRegionGroupCountMap.put(
storageGroup, new Pair<>(maxSchemaRegionGroupCount, maxDataRegionGroupCount));
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AdjustMaxRegionGroupCountReq that = (AdjustMaxRegionGroupCountReq) o;
return maxRegionGroupCountMap.equals(that.maxRegionGroupCountMap);
}

@Override
public int hashCode() {
return Objects.hash(maxRegionGroupCountMap);
}
}
@@ -38,29 +38,29 @@
public class CreateRegionsReq extends ConfigRequest {

// Map<StorageGroupName, List<TRegionReplicaSet>>
private final Map<String, List<TRegionReplicaSet>> regionMap;
private final Map<String, List<TRegionReplicaSet>> regionGroupMap;

public CreateRegionsReq() {
super(ConfigRequestType.CreateRegions);
this.regionMap = new TreeMap<>();
super(ConfigRequestType.CreateRegionGroups);
this.regionGroupMap = new TreeMap<>();
}

public Map<String, List<TRegionReplicaSet>> getRegionMap() {
return regionMap;
public Map<String, List<TRegionReplicaSet>> getRegionGroupMap() {
return regionGroupMap;
}

public void addRegion(String storageGroup, TRegionReplicaSet regionReplicaSet) {
regionMap
public void addRegionGroup(String storageGroup, TRegionReplicaSet regionReplicaSet) {
regionGroupMap
.computeIfAbsent(storageGroup, regionReplicaSets -> new ArrayList<>())
.add(regionReplicaSet);
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeInt(ConfigRequestType.CreateRegions.ordinal());
stream.writeInt(ConfigRequestType.CreateRegionGroups.ordinal());

stream.writeInt(regionMap.size());
for (Entry<String, List<TRegionReplicaSet>> entry : regionMap.entrySet()) {
stream.writeInt(regionGroupMap.size());
for (Entry<String, List<TRegionReplicaSet>> entry : regionGroupMap.entrySet()) {
String storageGroup = entry.getKey();
List<TRegionReplicaSet> regionReplicaSets = entry.getValue();
BasicStructureSerDeUtil.write(storageGroup, stream);
@@ -76,13 +76,13 @@ protected void deserializeImpl(ByteBuffer buffer) throws IOException {
int storageGroupNum = buffer.getInt();
for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = BasicStructureSerDeUtil.readString(buffer);
regionMap.put(storageGroup, new ArrayList<>());
regionGroupMap.put(storageGroup, new ArrayList<>());

int regionReplicaSetNum = buffer.getInt();
for (int j = 0; j < regionReplicaSetNum; j++) {
TRegionReplicaSet regionReplicaSet =
ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(buffer);
regionMap.get(storageGroup).add(regionReplicaSet);
regionGroupMap.get(storageGroup).add(regionReplicaSet);
}
}
}
@@ -92,11 +92,11 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CreateRegionsReq that = (CreateRegionsReq) o;
return regionMap.equals(that.regionMap);
return regionGroupMap.equals(that.regionGroupMap);
}

@Override
public int hashCode() {
return Objects.hash(regionMap);
return Objects.hash(regionGroupMap);
}
}

0 comments on commit a243651

Please sign in to comment.