Skip to content

Commit

Permalink
Shuffle the segments when rebalancing the table to avoid creating hot…
Browse files Browse the repository at this point in the history
…spot servers (#5197)

When new servers are added to an existing replica-group based table
and rebalance is triggered, current behavior will assign segments
in alphabetical order, which might move only the new segments to
the new added servers. Because queries tend to query the most recent
segments, this behavior might cause new added servers to become the
hotspot servers.
To address this issue, we shuffle the segments so that old and new
segments can be balanced assigned.
We use the hash of the table name as the random seed to shuffle the
segments so that the result is deterministic.

It is a little bit tricky to write a test case for this. Since the
change is straight-forward and the existing tests already have
pretty good coverage, after manually verified the expected behavior,
no new test is added.
  • Loading branch information
Jackie-Jiang committed Mar 31, 2020
1 parent 67c30ed commit 95e0f1d
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.configuration.Configuration;
Expand Down Expand Up @@ -214,10 +215,16 @@ public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, S
Preconditions.checkState(instancePartitions.getNumPartitions() == 1,
"Instance partitions: %s should contain 1 partition without partition column",
instancePartitions.getInstancePartitionsName());

// NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
// servers, which might cause hotspot servers because queries tend to hit the new segments. Use the table
// name hash as the random seed for the shuffle so that the result is deterministic.
List<String> segments = new ArrayList<>(currentAssignment.keySet());
Collections.shuffle(segments, new Random(_offlineTableName.hashCode()));

newAssignment = new TreeMap<>();
SegmentAssignmentUtils
.rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, 0, currentAssignment.keySet(),
newAssignment);
.rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, 0, segments, newAssignment);
} else {
LOGGER.info("Rebalancing table: {} with partition column: {}", _offlineTableName, _partitionColumn);
newAssignment = rebalanceTableWithPartition(currentAssignment, instancePartitions);
Expand All @@ -238,10 +245,18 @@ private Map<String, Map<String, String>> rebalanceTableWithPartition(
for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
segmentZKMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata);
}
Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
Map<Integer, List<String>> partitionIdToSegmentsMap = new HashMap<>();
for (String segmentName : currentAssignment.keySet()) {
int partitionId = getPartitionId(segmentZKMetadataMap.get(segmentName));
partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new HashSet<>()).add(segmentName);
partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(segmentName);
}

// NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
// servers, which might cause hotspot servers because queries tend to hit the new segments. Use the table
// name hash as the random seed for the shuffle so that the result is deterministic.
Random random = new Random(_offlineTableName.hashCode());
for (List<String> segments : partitionIdToSegmentsMap.values()) {
Collections.shuffle(segments, random);
}

return SegmentAssignmentUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Random;
import java.util.TreeMap;
import org.apache.commons.configuration.Configuration;
import org.apache.helix.HelixManager;
Expand Down Expand Up @@ -202,11 +202,20 @@ public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, S
_realtimeTableName, numReplicaGroups);
}

Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
Map<Integer, List<String>> partitionIdToSegmentsMap = new HashMap<>();
for (String segmentName : completedSegmentAssignment.keySet()) {
int partitionId = new LLCSegmentName(segmentName).getPartitionId();
partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new HashSet<>()).add(segmentName);
partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(segmentName);
}

// NOTE: Shuffle the segments within the current assignment to avoid moving only new segments to the new added
// servers, which might cause hotspot servers because queries tend to hit the new segments. Use the table
// name hash as the random seed for the shuffle so that the result is deterministic.
Random random = new Random(_realtimeTableName.hashCode());
for (List<String> segments : partitionIdToSegmentsMap.values()) {
Collections.shuffle(segments, random);
}

newAssignment = SegmentAssignmentUtils
.rebalanceReplicaGroupBasedTable(completedSegmentAssignment, completedInstancePartitions,
partitionIdToSegmentsMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ static Map<String, Map<String, String>> rebalanceTableWithHelixAutoRebalanceStra
*/
static Map<String, Map<String, String>> rebalanceReplicaGroupBasedTable(
Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions,
Map<Integer, Set<String>> partitionIdToSegmentsMap) {
Map<Integer, List<String>> partitionIdToSegmentsMap) {
Map<String, Map<String, String>> newAssignment = new TreeMap<>();
int numPartitions = instancePartitions.getNumPartitions();
for (Map.Entry<Integer, Set<String>> entry : partitionIdToSegmentsMap.entrySet()) {
for (Map.Entry<Integer, List<String>> entry : partitionIdToSegmentsMap.entrySet()) {
// Uniformly spray the segment partitions over the instance partitions
int partitionId = entry.getKey() % numPartitions;
SegmentAssignmentUtils
Expand Down Expand Up @@ -147,7 +147,7 @@ static Map<String, Map<String, String>> rebalanceReplicaGroupBasedTable(
* </ul>
*/
static void rebalanceReplicaGroupBasedPartition(Map<String, Map<String, String>> currentAssignment,
InstancePartitions instancePartitions, int partitionId, Set<String> segments,
InstancePartitions instancePartitions, int partitionId, List<String> segments,
Map<String, Map<String, String>> newAssignment) {
// Fetch instances in replica-group 0
List<String> instances = instancePartitions.getInstances(partitionId, 0);
Expand All @@ -162,14 +162,9 @@ static void rebalanceReplicaGroupBasedPartition(Map<String, Map<String, String>>
// Do not move segment if target number of segments is not reached, track the segments need to be moved
int[] numSegmentsAssignedPerInstance = new int[numInstances];
List<String> segmentsNotAssigned = new ArrayList<>();
for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
String segmentName = entry.getKey();
// Skip segments not in the partition
if (!segments.contains(segmentName)) {
continue;
}
for (String segmentName : segments) {
boolean segmentAssigned = false;
for (String instanceName : entry.getValue().keySet()) {
for (String instanceName : currentAssignment.get(segmentName).keySet()) {
Integer instanceId = instanceNameToIdMap.get(instanceName);
if (instanceId != null && numSegmentsAssignedPerInstance[instanceId] < targetNumSegmentsPerInstance) {
newAssignment
Expand Down Expand Up @@ -261,9 +256,9 @@ static class CompletedConsumingOfflineSegmentAssignment {
for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) {
_completedSegmentAssignment.put(segmentName, instanceStateMap);
} else if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
} else if (instanceStateMap.containsValue(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) {
_consumingSegmentAssignment.put(segmentName, instanceStateMap);
} else {
_offlineSegmentAssignment.put(segmentName, instanceStateMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
Expand Down Expand Up @@ -207,7 +205,7 @@ public void testRebalanceReplicaGroupBasedTable() {

int numSegments = 90;
List<String> segments = SegmentAssignmentTestUtils.getNameList(SEGMENT_NAME_PREFIX, numSegments);
Map<Integer, Set<String>> partitionIdToSegmentsMap = Collections.singletonMap(0, new HashSet<>(segments));
Map<Integer, List<String>> partitionIdToSegmentsMap = Collections.singletonMap(0, segments);
int numInstances = 9;
List<String> instances = SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, numInstances);

Expand Down

0 comments on commit 95e0f1d

Please sign in to comment.