Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions docker/runtime/doris-compose/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def add_parser(self, args_parsers):
)
group1.add_argument("--be-disks",
nargs="*",
default=["HDD=1"],
default=None,
type=str,
help="Specify each be disks, each group is \"disk_type=disk_num[,disk_capactity]\", "\
"disk_type is HDD or SSD, disk_capactity is capactity limit in gb. default: HDD=1. "\
Expand Down Expand Up @@ -628,7 +628,8 @@ def run(self, args):
args.NAME, args.IMAGE, args.cloud, args.root, args.fe_config,
args.be_config, args.ms_config, args.recycle_config,
args.remote_master_fe, args.local_network_ip, args.fe_follower,
args.be_disks, args.be_cluster, args.reg_be, args.extra_hosts,
args.be_disks if args.be_disks is not None else ["HDD=1"],
args.be_cluster, args.reg_be, args.extra_hosts,
args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr,
args.be_metaservice_endpoint, args.be_cluster_id, args.tde_ak, args.tde_sk)
LOG.info("Create new cluster {} succ, cluster path is {}".format(
Expand Down Expand Up @@ -669,9 +670,19 @@ def do_add_node(node_type, add_num, add_ids):
related_nodes.append(node)
add_ids.append(node.id)

# If --be-disks is explicitly provided for an existing cluster,
# temporarily override cluster.be_disks so newly added BEs use
# the specified disk config instead of the original cluster config.
saved_be_disks = cluster.be_disks
if args.be_disks is not None:
cluster.be_disks = args.be_disks

for node_type, add_num, add_ids in add_type_nums:
do_add_node(node_type, add_num, add_ids)

# Restore original be_disks to avoid side effects
cluster.be_disks = saved_be_disks

if args.IMAGE:
for node in related_nodes:
node.set_image(args.IMAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,9 @@ public void setPartitionCollectInfoMap(ImmutableMap<Long, PartitionCollectInfo>
// Only build from available bes, exclude colocate tables
@Override
public Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> buildPartitionInfoBySkew(
List<Long> availableBeIds, Map<Long, Pair<TabletMove, Long>> movesInProgress) {
List<Long> availableBeIds,
Map<TStorageMedium, List<Long>> availableBeIdsByMedium,
Map<Long, Pair<TabletMove, Long>> movesInProgress) {
Set<Long> dbIds = Sets.newHashSet();
Set<Long> tableIds = Sets.newHashSet();
Set<Long> partitionIds = Sets.newHashSet();
Expand Down Expand Up @@ -986,11 +988,14 @@ public Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> buildPartit
Map<Long, Long> countMap = partitionReplicasInfo.get(
tabletMeta.getPartitionId(), tabletMeta.getIndexId());
if (countMap == null) {
// If one be doesn't have any replica of one partition, it should be counted too.
countMap = availableBeIds.stream().collect(Collectors.toMap(i -> i, i -> 0L));
// If one be doesn't have any replica of one partition,
// it should be counted too.
List<Long> availableBeIdsForMedium = availableBeIdsByMedium.getOrDefault(
medium, Lists.newArrayList());
countMap = availableBeIdsForMedium.stream().collect(Collectors.toMap(i -> i, i -> 0L));
}

Long count = countMap.get(beId);
Long count = countMap.getOrDefault(beId, 0L);
countMap.put(beId, count + 1L);
partitionReplicasInfo.put(tabletMeta.getPartitionId(), tabletMeta.getIndexId(), countMap);
partitionReplicasInfoMaps.put(medium, partitionReplicasInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ public void setPartitionCollectInfoMap(ImmutableMap<Long, PartitionCollectInfo>
}

public Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> buildPartitionInfoBySkew(
List<Long> availableBeIds, Map<Long, Pair<TabletMove, Long>> movesInProgress) {
List<Long> availableBeIds,
Map<TStorageMedium, List<Long>> availableBeIdsByMedium,
Map<Long, Pair<TabletMove, Long>> movesInProgress) {
throw new UnsupportedOperationException("buildPartitionInfoBySkew is not supported in TabletInvertedIndex");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ public void init() {
// Only count the available be
for (TStorageMedium medium : TStorageMedium.values()) {
TreeMultimap<Long, Long> beByTotalReplicaCount = TreeMultimap.create();
beLoadStatistics.stream().filter(BackendLoadStatistic::isAvailable).forEach(beStat ->
beLoadStatistics.stream().filter(BackendLoadStatistic::isAvailable)
.filter(beStat -> beStat.hasMedium(medium)).forEach(beStat ->
beByTotalReplicaCount.put(beStat.getReplicaNum(medium), beStat.getBeId()));
beByTotalReplicaCountMaps.put(medium, beByTotalReplicaCount);
}
Expand All @@ -173,9 +174,17 @@ public void init() {
.filter(BackendLoadStatistic::isAvailable)
.map(BackendLoadStatistic::getBeId)
.collect(Collectors.toList());
Map<TStorageMedium, List<Long>> availableBeIdsByMedium = Maps.newHashMap();
for (TStorageMedium medium : TStorageMedium.values()) {
availableBeIdsByMedium.put(medium, beLoadStatistics.stream()
.filter(BackendLoadStatistic::isAvailable)
.filter(be -> be.hasMedium(medium))
.map(BackendLoadStatistic::getBeId)
.collect(Collectors.toList()));
}
Map<Long, Pair<TabletMove, Long>> movesInProgress = rebalancer == null ? Maps.newHashMap()
: ((PartitionRebalancer) rebalancer).getMovesInProgress();
skewMaps = invertedIndex.buildPartitionInfoBySkew(availableBeIds, movesInProgress);
skewMaps = invertedIndex.buildPartitionInfoBySkew(availableBeIds, availableBeIdsByMedium, movesInProgress);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ protected void completeSchedCtx(TabletSchedCtx tabletCtx)

List<RootPathLoadStatistic> paths = beStat.getPathStatistics();
List<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
&& path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK)
&& path.isFit(tabletCtx.getTabletSize(), false).ok())
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList());
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath, tabletCtx.getTag(),
tabletCtx.getStorageMedium());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,87 @@ public void testPartitionRebalancer() {
Assert.assertEquals(needCheckTablets.size(), succeeded.get());
}

// Test for OPENSOURCE-192: PartitionRebalancer should not generate moves
// targeting a BE that lacks the required storage medium.
// Scenario: SSD tablets on BE 20001/20002, new BE 20003 has only HDD.
// Without the fix, the algorithm would pick BE 20003 (0 SSD replicas) as the
// "least loaded" destination for SSD tablets, causing infinite scheduling failures.
@Test
public void testPartitionRebalancerSkipBEWithoutMedium() {
Configurator.setLevel("org.apache.doris.clone.PartitionRebalancer", Level.DEBUG);

// Add backends: 20001, 20002 have SSD; 20003 has only HDD
systemInfoService.addBackend(
RebalancerTestUtil.createBackend(20001L, 2048, 0, TStorageMedium.SSD));
systemInfoService.addBackend(
RebalancerTestUtil.createBackend(20002L, 2048, 0, TStorageMedium.SSD));
systemInfoService.addBackend(
RebalancerTestUtil.createBackend(20003L, 2048, 0, TStorageMedium.HDD));

// Create a table with SSD partition
OlapTable ssdTable = new OlapTable(3, "ssd table", new ArrayList<>(),
KeysType.DUP_KEYS, new RangePartitionInfo(), new HashDistributionInfo());
db.registerTable(ssdTable);

MaterializedIndex ssdIndex = new MaterializedIndex(ssdTable.getId(), null);
long partId = 41;
Partition partition = new Partition(partId, "p0", ssdIndex, new HashDistributionInfo());
ssdTable.addPartition(partition);
ssdTable.getPartitionInfo().addPartition(partId, new DataProperty(TStorageMedium.SSD),
ReplicaAllocation.DEFAULT_ALLOCATION, false, true);
ssdTable.setIndexMeta(ssdIndex.getId(), "ssd index", Lists.newArrayList(new Column()),
0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS);

// Create SSD tablets: 3 replicas on BE 20001, 1 on BE 20002
// This creates skew = 3 - 1 = 2 among SSD BEs (with fix),
// or skew = 3 - 0 = 3 counting HDD-only BEs (without fix)
RebalancerTestUtil.createTablet(invertedIndex, db, ssdTable, "p0", TStorageMedium.SSD,
80001, Lists.newArrayList(20001L));
RebalancerTestUtil.createTablet(invertedIndex, db, ssdTable, "p0", TStorageMedium.SSD,
80002, Lists.newArrayList(20001L));
RebalancerTestUtil.createTablet(invertedIndex, db, ssdTable, "p0", TStorageMedium.SSD,
80003, Lists.newArrayList(20001L));
RebalancerTestUtil.createTablet(invertedIndex, db, ssdTable, "p0", TStorageMedium.SSD,
80004, Lists.newArrayList(20002L));

// Regenerate statistics with partition rebalancer
Config.tablet_rebalancer_type = "partition";
LoadStatisticForTag loadStatistic = new LoadStatisticForTag(
Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex, null);
loadStatistic.init();
Map<Tag, LoadStatisticForTag> ssdStatMap = Maps.newHashMap();
ssdStatMap.put(Tag.DEFAULT_BACKEND_TAG, loadStatistic);

PartitionRebalancer rebalancer = new PartitionRebalancer(Env.getCurrentSystemInfo(),
Env.getCurrentInvertedIndex(), null);
rebalancer.updateLoadStatistic(ssdStatMap);
rebalancer.selectAlternativeTablets();

// Verify: moves were generated (test is meaningful)
Map<Long, Pair<PartitionRebalancer.TabletMove, Long>> moves = rebalancer.getMovesInProgress();
Assert.assertFalse("Should generate moves for skewed SSD partition", moves.isEmpty());

// Verify: no move targets BE 20003 (HDD-only) or any of the HDD BEs from setUp (10001-10004)
for (Map.Entry<Long, Pair<PartitionRebalancer.TabletMove, Long>> entry : moves.entrySet()) {
PartitionRebalancer.TabletMove move = entry.getValue().first;
Assert.assertNotEquals("Move should not target HDD-only BE for SSD tablet",
Long.valueOf(20003L), move.toBe);
Assert.assertFalse("Move should not target any BE without SSD",
move.toBe == 10001L || move.toBe == 10002L
|| move.toBe == 10003L || move.toBe == 10004L);
}

// Verify: all moves go from BE 20001 (most loaded) to BE 20002 (least loaded with SSD)
for (Map.Entry<Long, Pair<PartitionRebalancer.TabletMove, Long>> entry : moves.entrySet()) {
PartitionRebalancer.TabletMove move = entry.getValue().first;
Assert.assertEquals("Source should be the most loaded SSD BE",
Long.valueOf(20001L), move.fromBe);
Assert.assertEquals("Dest should be the least loaded SSD BE",
Long.valueOf(20002L), move.toBe);
}
LOG.info("testPartitionRebalancerSkipBEWithoutMedium success");
}

@Test
public void testMoveInProgressMap() {
Configurator.setLevel("org.apache.doris.clone.MovesInProgressCache", Level.DEBUG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ public static Backend createBackend(long id, long totalCap, long usedCap) {
return createBackend(id, totalCap, Lists.newArrayList(usedCap), 1);
}

// Add only one path with specified storage medium, PathHash:id
public static Backend createBackend(long id, long totalCap, long usedCap, TStorageMedium medium) {
Backend be = new Backend(id, "192.168.0." + id, 9051);
Map<String, DiskInfo> disks = Maps.newHashMap();
DiskInfo diskInfo = new DiskInfo("/path1");
diskInfo.setPathHash(id);
diskInfo.setTotalCapacityB(totalCap);
diskInfo.setDataUsedCapacityB(usedCap);
diskInfo.setAvailableCapacityB(totalCap - usedCap);
diskInfo.setStorageMedium(medium);
disks.put(diskInfo.getRootPath(), diskInfo);
be.setDisks(ImmutableMap.copyOf(disks));
be.setAlive(true);
return be;
}

/**
* size of usedCaps should equal to diskNum.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,40 +524,49 @@ class SuiteCluster {
}

List<Integer> addFrontend(int num, boolean followerMode=false) throws Exception {
def result = add(num, 0, null, followerMode)
def result = add(0, num, '', false, null)
return result.first
}

List<Integer> addBackend(int num, List<String> beDisks) throws Exception {
def result = add(0, num, '', false, beDisks)
return result.second
}

List<Integer> addBackend(int num, String ClusterName='') throws Exception {
def result = add(0, num, ClusterName)
return result.second
}

// ATTN: clusterName just used for cloud mode, 1 cluster has n bes
// ATTN: followerMode just used for cloud mode
Tuple2<List<Integer>, List<Integer>> add(int feNum, int beNum, String clusterName, boolean followerMode=false) throws Exception {
// ATTN: beDisks just used for not cloud mode
Tuple2<List<Integer>, List<Integer>> add(int feNum, int beNum, String clusterName, boolean followerMode=false, List<String> beDisks=null) throws Exception {
assert feNum > 0 || beNum > 0

def sb = new StringBuilder()
sb.append('up ' + name + ' ')
def cmd = ['up', name]
if (feNum > 0) {
sb.append('--add-fe-num ' + feNum + ' ')
cmd += ['--add-fe-num', String.valueOf(feNum)]
if (followerMode) {
sb.append('--fe-follower' + ' ')
cmd += ['--fe-follower']
}
if (sqlModeNodeMgr) {
sb.append('--sql-mode-node-mgr' + ' ')
cmd += ['--sql-mode-node-mgr']
}
}
if (beNum > 0) {
sb.append('--add-be-num ' + beNum + ' ')
cmd += ['--add-be-num', String.valueOf(beNum)]
if (clusterName != null && !clusterName.isEmpty()) {
sb.append(' --be-cluster ' + clusterName + ' ')
cmd += ['--be-cluster', clusterName]
}
}
sb.append('--wait-timeout 60')
if (beDisks != null && !beDisks.isEmpty()) {
cmd += ['--be-disks']
cmd += beDisks
}
cmd += ['--wait-timeout', '60']

def data = (Map<String, Map<String, Object>>) runCmd(sb.toString(), 180)
def data = (Map<String, Map<String, Object>>) runCmdList(cmd, 180)
def newFrontends = (List<Integer>) data.get('fe').get('add_list')
def newBackends = (List<Integer>) data.get('be').get('add_list')

Expand Down
Loading
Loading