From edbd7793ae1c5d234cbaf4fc07f7130595577864 Mon Sep 17 00:00:00 2001 From: J-HowHuang Date: Tue, 3 Jun 2025 15:34:34 -0700 Subject: [PATCH 1/5] add batchSizePerServer check into rebalance config precheck --- .../rebalance/DefaultRebalancePreChecker.java | 30 +++++++++++++++++-- .../core/rebalance/RebalancePreChecker.java | 9 +++++- .../helix/core/rebalance/TableRebalancer.java | 16 +++++----- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java index d53256bc1859..684482e805ad 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java @@ -101,7 +101,8 @@ public Map check(PreCheckContext preCheckCont preCheckContext.getTableSubTypeSizeDetails(), _diskUtilizationThreshold, false)); preCheckResult.put(REBALANCE_CONFIG_OPTIONS, checkRebalanceConfig(rebalanceConfig, tableConfig, - preCheckContext.getCurrentAssignment(), preCheckContext.getTargetAssignment())); + preCheckContext.getCurrentAssignment(), preCheckContext.getTargetAssignment(), + preCheckContext.getRebalanceSummaryResult())); preCheckResult.put(REPLICA_GROUPS_INFO, checkReplicaGroups(tableConfig, rebalanceConfig)); @@ -335,7 +336,8 @@ private RebalancePreCheckerResult checkDiskUtilization(Map> currentAssignment, Map> targetAssignment) { + Map> currentAssignment, Map> targetAssignment, + RebalanceSummaryResult rebalanceSummaryResult) { List warnings = new ArrayList<>(); boolean pass = true; if (rebalanceConfig.isBestEfforts()) { @@ -392,6 +394,29 @@ private RebalancePreCheckerResult checkRebalanceConfig(RebalanceConfig rebalance warnings.add("updateTargetTier should be enabled when tier configs are present"); } + // --- Batch size per server recommendation check using summary --- + int maxSegmentsToAddOnServer = 0; + if (rebalanceSummaryResult.getServerInfo() != null && + rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo() != null) { + for (RebalanceSummaryResult.ServerSegmentChangeInfo info : rebalanceSummaryResult.getServerInfo() + .getServerSegmentChangeInfo() + .values()) { + maxSegmentsToAddOnServer = Math.max(maxSegmentsToAddOnServer, info.getSegmentsAdded()); + } + } + int batchSizePerServer = rebalanceConfig.getBatchSizePerServer(); + final int SEGMENT_ADD_THRESHOLD = 500; + final int RECOMMENDED_BATCH_SIZE = 200; + if (maxSegmentsToAddOnServer > SEGMENT_ADD_THRESHOLD) { + if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER + || batchSizePerServer > RECOMMENDED_BATCH_SIZE) { + pass = false; + warnings.add("Number of segments to add to a single server (" + maxSegmentsToAddOnServer + ") is high (>" + + SEGMENT_ADD_THRESHOLD + "). It is recommended to set batchSizePerServer to " + RECOMMENDED_BATCH_SIZE + + " or lower to avoid excessive load on servers."); + } + } + return pass ? RebalancePreCheckerResult.pass("All rebalance parameters look good") : RebalancePreCheckerResult.warn(StringUtil.join("\n", warnings.toArray(String[]::new))); } @@ -463,3 +488,4 @@ private long getAverageSegmentSize(TableSizeReader.TableSubTypeSizeDetails table return tableSizePerReplicaInBytes / ((long) currentAssignment.size()); } } + diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java index 83b7302d2f22..91e80bb497d1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java @@ -38,10 +38,12 @@ class PreCheckContext { private final Map> _targetAssignment; private final TableSizeReader.TableSubTypeSizeDetails _tableSubTypeSizeDetails; private final RebalanceConfig _rebalanceConfig; + private final RebalanceSummaryResult _rebalanceSummaryResult; public PreCheckContext(String rebalanceJobId, String tableNameWithType, TableConfig tableConfig, Map> currentAssignment, Map> targetAssignment, - @Nullable TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, RebalanceConfig rebalanceConfig) { + @Nullable TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, RebalanceConfig rebalanceConfig, + RebalanceSummaryResult rebalanceSummaryResult) { _rebalanceJobId = rebalanceJobId; _tableNameWithType = tableNameWithType; _tableConfig = tableConfig; @@ -49,6 +51,7 @@ public PreCheckContext(String rebalanceJobId, String tableNameWithType, TableCon _targetAssignment = targetAssignment; _tableSubTypeSizeDetails = tableSubTypeSizeDetails; _rebalanceConfig = rebalanceConfig; + _rebalanceSummaryResult = rebalanceSummaryResult; } public String getRebalanceJobId() { @@ -78,6 +81,10 @@ public TableSizeReader.TableSubTypeSizeDetails getTableSubTypeSizeDetails() { public RebalanceConfig getRebalanceConfig() { return _rebalanceConfig; } + + public RebalanceSummaryResult getRebalanceSummaryResult() { + return _rebalanceSummaryResult; + } } Map check(PreCheckContext preCheckContext); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index 934b4083f19b..0dfd66de6f0a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -339,22 +339,24 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb fetchTableSizeDetails(tableNameWithType, tableRebalanceLogger); Map preChecksResult = null; + + // Calculate summary here itself so that even if the table is already balanced, the caller can verify whether that + // is expected or not based on the summary results + RebalanceSummaryResult summaryResult = + calculateDryRunSummary(currentAssignment, targetAssignment, tableNameWithType, tableSubTypeSizeDetails, + tableConfig, tableRebalanceLogger); + if (preChecks) { if (_rebalancePreChecker == null) { tableRebalanceLogger.warn( "Pre-checks are enabled but the pre-checker is not set, skipping pre-checks"); } else { RebalancePreChecker.PreCheckContext preCheckContext = - new RebalancePreChecker.PreCheckContext(rebalanceJobId, tableNameWithType, - tableConfig, currentAssignment, targetAssignment, tableSubTypeSizeDetails, rebalanceConfig); + new RebalancePreChecker.PreCheckContext(rebalanceJobId, tableNameWithType, tableConfig, currentAssignment, + targetAssignment, tableSubTypeSizeDetails, rebalanceConfig, summaryResult); preChecksResult = _rebalancePreChecker.check(preCheckContext); } } - // Calculate summary here itself so that even if the table is already balanced, the caller can verify whether that - // is expected or not based on the summary results - RebalanceSummaryResult summaryResult = - calculateDryRunSummary(currentAssignment, targetAssignment, tableNameWithType, tableSubTypeSizeDetails, - tableConfig, tableRebalanceLogger); if (segmentAssignmentUnchanged) { tableRebalanceLogger.info("Table is already balanced"); From 529bb4a0da1a7b55442aadef29a2613001115327 Mon Sep 17 00:00:00 2001 From: J-HowHuang Date: Tue, 3 Jun 2025 16:07:36 -0700 Subject: [PATCH 2/5] add test --- .../rebalance/DefaultRebalancePreChecker.java | 5 ++- .../TableRebalancerClusterStatelessTest.java | 42 ++++++++++++++++++- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java index 684482e805ad..5ffe42e4a033 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java @@ -59,6 +59,9 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { public static final String REBALANCE_CONFIG_OPTIONS = "rebalanceConfigOptions"; public static final String REPLICA_GROUPS_INFO = "replicaGroupsInfo"; + public static final int SEGMENT_ADD_THRESHOLD = 200; + public static final int RECOMMENDED_BATCH_SIZE = 200; + private static double _diskUtilizationThreshold; protected PinotHelixResourceManager _pinotHelixResourceManager; @@ -405,8 +408,6 @@ private RebalancePreCheckerResult checkRebalanceConfig(RebalanceConfig rebalance } } int batchSizePerServer = rebalanceConfig.getBatchSizePerServer(); - final int SEGMENT_ADD_THRESHOLD = 500; - final int RECOMMENDED_BATCH_SIZE = 200; if (maxSegmentsToAddOnServer > SEGMENT_ADD_THRESHOLD) { if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER || batchSizePerServer > RECOMMENDED_BATCH_SIZE) { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index d7e111d83de1..afa4221a2cf8 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -1053,7 +1053,7 @@ public void testRebalancePreCheckerRebalanceConfig() assertEquals(preCheckerResult.getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.WARN); assertEquals(preCheckerResult.getMessage(), "Number of replicas (3) is greater than 1, downtime is not recommended.\nDowntime or minAvailableReplicas=0 " - + "for pauseless tables may cause data loss during rebalance"); + + "for pauseless tables may cause data loss during rebalance"); rebalanceConfig.setDowntime(false); rebalanceConfig.setMinAvailableReplicas(-3); @@ -1081,9 +1081,47 @@ public void testRebalancePreCheckerRebalanceConfig() assertEquals(preCheckerResult.getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.PASS); assertEquals(preCheckerResult.getMessage(), "All rebalance parameters look good"); + // Add more segments + int additionalNumSegments = DefaultRebalancePreChecker.SEGMENT_ADD_THRESHOLD + 1; + for (int i = 0; i < additionalNumSegments; i++) { + _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, SEGMENT_NAME_PREFIX + (numSegments + i)), null); + } + + // Add one more server instance + String instanceId = "preCheckerRebalanceConfig_" + SERVER_INSTANCE_ID_PREFIX + numServers; + addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true); + + // change num replicas from 3 to 4 + newTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(4).build(); + + // now the new server (the 4th server) should expect to be added all the existing segments (including consuming) + rebalanceResult = tableRebalancer.rebalance(newTableConfig, rebalanceConfig, null); + int expectedNumSegmentsToAdd = FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS + additionalNumSegments + numSegments; + assertEquals(rebalanceResult.getRebalanceSummaryResult() + .getServerInfo() + .getServerSegmentChangeInfo() + .get(instanceId) + .getSegmentsAdded(), expectedNumSegmentsToAdd); + preCheckerResult = rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS); + assertNotNull(preCheckerResult); + assertEquals(preCheckerResult.getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.WARN); + assertEquals(preCheckerResult.getMessage(), + "Number of segments to add to a single server (" + expectedNumSegmentsToAdd + ") is high (>" + + DefaultRebalancePreChecker.SEGMENT_ADD_THRESHOLD + "). It is recommended to set batchSizePerServer to " + + DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE + + " or lower to avoid excessive load on servers."); + + rebalanceConfig.setBatchSizePerServer(DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE); + rebalanceResult = tableRebalancer.rebalance(newTableConfig, rebalanceConfig, null); + preCheckerResult = rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS); + assertNotNull(preCheckerResult); + assertEquals(preCheckerResult.getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.PASS); + assertEquals(preCheckerResult.getMessage(), "All rebalance parameters look good"); + _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME); - for (int i = 0; i < numServers; i++) { + for (int i = 0; i < numServers + 1; i++) { stopAndDropFakeInstance("preCheckerRebalanceConfig_" + SERVER_INSTANCE_ID_PREFIX + i); } executorService.shutdown(); From 0058e90c061074962f3abecde386715e2f93c200 Mon Sep 17 00:00:00 2001 From: J-HowHuang Date: Tue, 3 Jun 2025 16:23:19 -0700 Subject: [PATCH 3/5] lint: style --- .../helix/core/rebalance/DefaultRebalancePreChecker.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java index 5ffe42e4a033..4e9a791e661b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java @@ -399,8 +399,8 @@ private RebalancePreCheckerResult checkRebalanceConfig(RebalanceConfig rebalance // --- Batch size per server recommendation check using summary --- int maxSegmentsToAddOnServer = 0; - if (rebalanceSummaryResult.getServerInfo() != null && - rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo() != null) { + if (rebalanceSummaryResult.getServerInfo() != null + && rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo() != null) { for (RebalanceSummaryResult.ServerSegmentChangeInfo info : rebalanceSummaryResult.getServerInfo() .getServerSegmentChangeInfo() .values()) { @@ -489,4 +489,3 @@ private long getAverageSegmentSize(TableSizeReader.TableSubTypeSizeDetails table return tableSizePerReplicaInBytes / ((long) currentAssignment.size()); } } - From 72a95862c93c72f62a47f09f3b81e06a145267de Mon Sep 17 00:00:00 2001 From: J-HowHuang Date: Tue, 3 Jun 2025 16:26:45 -0700 Subject: [PATCH 4/5] use segmentInfo.maxSegmentsAddedToASingleServer instead --- .../core/rebalance/DefaultRebalancePreChecker.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java index 4e9a791e661b..a893ada3bef7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java @@ -398,15 +398,7 @@ private RebalancePreCheckerResult checkRebalanceConfig(RebalanceConfig rebalance } // --- Batch size per server recommendation check using summary --- - int maxSegmentsToAddOnServer = 0; - if (rebalanceSummaryResult.getServerInfo() != null - && rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo() != null) { - for (RebalanceSummaryResult.ServerSegmentChangeInfo info : rebalanceSummaryResult.getServerInfo() - .getServerSegmentChangeInfo() - .values()) { - maxSegmentsToAddOnServer = Math.max(maxSegmentsToAddOnServer, info.getSegmentsAdded()); - } - } + int maxSegmentsToAddOnServer = rebalanceSummaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer(); int batchSizePerServer = rebalanceConfig.getBatchSizePerServer(); if (maxSegmentsToAddOnServer > SEGMENT_ADD_THRESHOLD) { if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER From 8854e275df259a101611afdfa2155112454b0625 Mon Sep 17 00:00:00 2001 From: J-HowHuang Date: Tue, 3 Jun 2025 16:28:30 -0700 Subject: [PATCH 5/5] update test according to the last commit --- .../core/rebalance/TableRebalancerClusterStatelessTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index afa4221a2cf8..a25d1780910f 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -1103,6 +1103,8 @@ public void testRebalancePreCheckerRebalanceConfig() .getServerSegmentChangeInfo() .get(instanceId) .getSegmentsAdded(), expectedNumSegmentsToAdd); + assertEquals(rebalanceResult.getRebalanceSummaryResult().getSegmentInfo().getMaxSegmentsAddedToASingleServer(), + expectedNumSegmentsToAdd); preCheckerResult = rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS); assertNotNull(preCheckerResult); assertEquals(preCheckerResult.getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.WARN);