Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker {
public static final String REBALANCE_CONFIG_OPTIONS = "rebalanceConfigOptions";
public static final String REPLICA_GROUPS_INFO = "replicaGroupsInfo";

public static final int SEGMENT_ADD_THRESHOLD = 200;
public static final int RECOMMENDED_BATCH_SIZE = 200;

private static double _diskUtilizationThreshold;

protected PinotHelixResourceManager _pinotHelixResourceManager;
Expand Down Expand Up @@ -101,7 +104,8 @@ public Map<String, RebalancePreCheckerResult> check(PreCheckContext preCheckCont
preCheckContext.getTableSubTypeSizeDetails(), _diskUtilizationThreshold, false));

preCheckResult.put(REBALANCE_CONFIG_OPTIONS, checkRebalanceConfig(rebalanceConfig, tableConfig,
preCheckContext.getCurrentAssignment(), preCheckContext.getTargetAssignment()));
preCheckContext.getCurrentAssignment(), preCheckContext.getTargetAssignment(),
preCheckContext.getRebalanceSummaryResult()));

preCheckResult.put(REPLICA_GROUPS_INFO, checkReplicaGroups(tableConfig, rebalanceConfig));

Expand Down Expand Up @@ -335,7 +339,8 @@ private RebalancePreCheckerResult checkDiskUtilization(Map<String, Map<String, S
}

private RebalancePreCheckerResult checkRebalanceConfig(RebalanceConfig rebalanceConfig, TableConfig tableConfig,
Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment) {
Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment,
RebalanceSummaryResult rebalanceSummaryResult) {
List<String> warnings = new ArrayList<>();
boolean pass = true;
if (rebalanceConfig.isBestEfforts()) {
Expand Down Expand Up @@ -392,6 +397,19 @@ private RebalancePreCheckerResult checkRebalanceConfig(RebalanceConfig rebalance
warnings.add("updateTargetTier should be enabled when tier configs are present");
}

// --- Batch size per server recommendation check using summary ---
int maxSegmentsToAddOnServer = rebalanceSummaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer();
int batchSizePerServer = rebalanceConfig.getBatchSizePerServer();
if (maxSegmentsToAddOnServer > SEGMENT_ADD_THRESHOLD) {
if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER
|| batchSizePerServer > RECOMMENDED_BATCH_SIZE) {
pass = false;
warnings.add("Number of segments to add to a single server (" + maxSegmentsToAddOnServer + ") is high (>"
+ SEGMENT_ADD_THRESHOLD + "). It is recommended to set batchSizePerServer to " + RECOMMENDED_BATCH_SIZE
+ " or lower to avoid excessive load on servers.");
}
}

return pass ? RebalancePreCheckerResult.pass("All rebalance parameters look good")
: RebalancePreCheckerResult.warn(StringUtil.join("\n", warnings.toArray(String[]::new)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,20 @@ class PreCheckContext {
private final Map<String, Map<String, String>> _targetAssignment;
private final TableSizeReader.TableSubTypeSizeDetails _tableSubTypeSizeDetails;
private final RebalanceConfig _rebalanceConfig;
private final RebalanceSummaryResult _rebalanceSummaryResult;

public PreCheckContext(String rebalanceJobId, String tableNameWithType, TableConfig tableConfig,
Map<String, Map<String, String>> currentAssignment, Map<String, Map<String, String>> targetAssignment,
@Nullable TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, RebalanceConfig rebalanceConfig) {
@Nullable TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, RebalanceConfig rebalanceConfig,
RebalanceSummaryResult rebalanceSummaryResult) {
_rebalanceJobId = rebalanceJobId;
_tableNameWithType = tableNameWithType;
_tableConfig = tableConfig;
_currentAssignment = currentAssignment;
_targetAssignment = targetAssignment;
_tableSubTypeSizeDetails = tableSubTypeSizeDetails;
_rebalanceConfig = rebalanceConfig;
_rebalanceSummaryResult = rebalanceSummaryResult;
}

public String getRebalanceJobId() {
Expand Down Expand Up @@ -78,6 +81,10 @@ public TableSizeReader.TableSubTypeSizeDetails getTableSubTypeSizeDetails() {
public RebalanceConfig getRebalanceConfig() {
return _rebalanceConfig;
}

public RebalanceSummaryResult getRebalanceSummaryResult() {
return _rebalanceSummaryResult;
}
}

Map<String, RebalancePreCheckerResult> check(PreCheckContext preCheckContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,22 +339,24 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb
fetchTableSizeDetails(tableNameWithType, tableRebalanceLogger);

Map<String, RebalancePreCheckerResult> preChecksResult = null;

// Calculate summary here itself so that even if the table is already balanced, the caller can verify whether that
// is expected or not based on the summary results
RebalanceSummaryResult summaryResult =
calculateDryRunSummary(currentAssignment, targetAssignment, tableNameWithType, tableSubTypeSizeDetails,
tableConfig, tableRebalanceLogger);

if (preChecks) {
if (_rebalancePreChecker == null) {
tableRebalanceLogger.warn(
"Pre-checks are enabled but the pre-checker is not set, skipping pre-checks");
} else {
RebalancePreChecker.PreCheckContext preCheckContext =
new RebalancePreChecker.PreCheckContext(rebalanceJobId, tableNameWithType,
tableConfig, currentAssignment, targetAssignment, tableSubTypeSizeDetails, rebalanceConfig);
new RebalancePreChecker.PreCheckContext(rebalanceJobId, tableNameWithType, tableConfig, currentAssignment,
targetAssignment, tableSubTypeSizeDetails, rebalanceConfig, summaryResult);
preChecksResult = _rebalancePreChecker.check(preCheckContext);
}
}
// Calculate summary here itself so that even if the table is already balanced, the caller can verify whether that
// is expected or not based on the summary results
RebalanceSummaryResult summaryResult =
calculateDryRunSummary(currentAssignment, targetAssignment, tableNameWithType, tableSubTypeSizeDetails,
tableConfig, tableRebalanceLogger);

if (segmentAssignmentUnchanged) {
tableRebalanceLogger.info("Table is already balanced");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ public void testRebalancePreCheckerRebalanceConfig()
assertEquals(preCheckerResult.getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.WARN);
assertEquals(preCheckerResult.getMessage(),
"Number of replicas (3) is greater than 1, downtime is not recommended.\nDowntime or minAvailableReplicas=0 "
+ "for pauseless tables may cause data loss during rebalance");
+ "for pauseless tables may cause data loss during rebalance");

rebalanceConfig.setDowntime(false);
rebalanceConfig.setMinAvailableReplicas(-3);
Expand Down Expand Up @@ -1081,9 +1081,49 @@ public void testRebalancePreCheckerRebalanceConfig()
assertEquals(preCheckerResult.getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.PASS);
assertEquals(preCheckerResult.getMessage(), "All rebalance parameters look good");

// Add more segments
int additionalNumSegments = DefaultRebalancePreChecker.SEGMENT_ADD_THRESHOLD + 1;
for (int i = 0; i < additionalNumSegments; i++) {
_helixResourceManager.addNewSegment(REALTIME_TABLE_NAME,
SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, SEGMENT_NAME_PREFIX + (numSegments + i)), null);
}

// Add one more server instance
String instanceId = "preCheckerRebalanceConfig_" + SERVER_INSTANCE_ID_PREFIX + numServers;
addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);

// change num replicas from 3 to 4
newTableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(4).build();

// now the new server (the 4th server) should expect to be added all the existing segments (including consuming)
rebalanceResult = tableRebalancer.rebalance(newTableConfig, rebalanceConfig, null);
int expectedNumSegmentsToAdd = FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS + additionalNumSegments + numSegments;
assertEquals(rebalanceResult.getRebalanceSummaryResult()
.getServerInfo()
.getServerSegmentChangeInfo()
.get(instanceId)
.getSegmentsAdded(), expectedNumSegmentsToAdd);
assertEquals(rebalanceResult.getRebalanceSummaryResult().getSegmentInfo().getMaxSegmentsAddedToASingleServer(),
expectedNumSegmentsToAdd);
preCheckerResult = rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS);
assertNotNull(preCheckerResult);
assertEquals(preCheckerResult.getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.WARN);
assertEquals(preCheckerResult.getMessage(),
"Number of segments to add to a single server (" + expectedNumSegmentsToAdd + ") is high (>"
+ DefaultRebalancePreChecker.SEGMENT_ADD_THRESHOLD + "). It is recommended to set batchSizePerServer to "
+ DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE
+ " or lower to avoid excessive load on servers.");

rebalanceConfig.setBatchSizePerServer(DefaultRebalancePreChecker.RECOMMENDED_BATCH_SIZE);
rebalanceResult = tableRebalancer.rebalance(newTableConfig, rebalanceConfig, null);
preCheckerResult = rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.REBALANCE_CONFIG_OPTIONS);
assertNotNull(preCheckerResult);
assertEquals(preCheckerResult.getPreCheckStatus(), RebalancePreCheckerResult.PreCheckStatus.PASS);
assertEquals(preCheckerResult.getMessage(), "All rebalance parameters look good");

_helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);

for (int i = 0; i < numServers; i++) {
for (int i = 0; i < numServers + 1; i++) {
stopAndDropFakeInstance("preCheckerRebalanceConfig_" + SERVER_INSTANCE_ID_PREFIX + i);
}
executorService.shutdown();
Expand Down
Loading