Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintain pool selection for the minimizeDataMovement instance partition assignment strategy #11953

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public static InstanceAssignmentConfig getInstanceAssignmentConfig(TableConfig t
replicaGroupStrategyConfig.getNumInstancesPerPartition(), 0, 0, minimizeDataMovement, null);
}

return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig);
return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, minimizeDataMovement);
}

public static boolean isMirrorServerSetAssignment(TableConfig tableConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void testSerDe()
InstanceAssignmentConfig instanceAssignmentConfig =
new InstanceAssignmentConfig(new InstanceTagPoolConfig("tenant_OFFLINE", true, 3, null),
new InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2")),
new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null));
new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0, false, null), null, false);
TableConfig tableConfig = tableConfigBuilder.setInstanceAssignmentConfigMap(
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), instanceAssignmentConfig)).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class FDAwareInstancePartitionSelector extends InstancePartitionSelector
private static final Logger LOGGER = LoggerFactory.getLogger(FDAwareInstancePartitionSelector.class);

public FDAwareInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions) {
super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions);
String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions, minimizeDataMovement);
}

/**
Expand Down Expand Up @@ -109,10 +109,7 @@ private Pair<Integer, Integer> processReplicaGroupAssignmentPreconditions(int nu
return new ImmutablePair<>(numReplicaGroups, numInstancesPerReplicaGroup);
}

/**
* Selects instances based on the replica-group/partition config, and stores the result into the given instance
* partitions.
*/
@Override
public void selectInstances(Map<Integer, List<InstanceConfig>> faultDomainToInstanceConfigsMap,
InstancePartitions instancePartitions) {

Expand Down Expand Up @@ -152,7 +149,7 @@ public void selectInstances(Map<Integer, List<InstanceConfig>> faultDomainToInst
* initialize the new replicaGroupBasedAssignmentState for assignment,
* place existing instances in their corresponding positions
*/
if (_replicaGroupPartitionConfig.isMinimizeDataMovement() && _existingInstancePartitions != null) {
if (_minimizeDataMovement) {
int numExistingReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
int numExistingPartitions = _existingInstancePartitions.getNumPartitions();
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public InstancePartitions assignInstances(InstancePartitionsType instancePartiti
}

public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable
InstancePartitions preConfiguredInstancePartitions) {
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions,
@Nullable InstancePartitions preConfiguredInstancePartitions) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
Expand All @@ -88,8 +88,10 @@ private InstancePartitions getInstancePartitions(String instancePartitionsName,
String tableNameWithType = _tableConfig.getTableName();
LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType);

boolean minimizeDataMovement = instanceAssignmentConfig.isMinimizeDataMovement();
InstanceTagPoolSelector tagPoolSelector =
new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType);
new InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(), tableNameWithType,
minimizeDataMovement, existingInstancePartitions);
Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap = tagPoolSelector.selectInstances(instanceConfigs);

InstanceConstraintConfig constraintConfig = instanceAssignmentConfig.getConstraintConfig();
Expand All @@ -106,7 +108,7 @@ private InstancePartitions getInstancePartitions(String instancePartitionsName,
InstancePartitionSelector instancePartitionSelector =
InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions,
preConfiguredInstancePartitions);
preConfiguredInstancePartitions, minimizeDataMovement);
InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName);
instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
return instancePartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
Expand All @@ -29,12 +30,17 @@ abstract class InstancePartitionSelector {
protected final InstanceReplicaGroupPartitionConfig _replicaGroupPartitionConfig;
protected final String _tableNameWithType;
protected final InstancePartitions _existingInstancePartitions;
protected final boolean _minimizeDataMovement;

public InstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig,
String tableNameWithType, InstancePartitions existingInstancePartitions) {
String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, boolean minimizeDataMovement) {
_replicaGroupPartitionConfig = replicaGroupPartitionConfig;
_tableNameWithType = tableNameWithType;
_existingInstancePartitions = existingInstancePartitions;
// For backward compatibility, enable minimize data movement when it is enabled in top level or instance partition
// selector level
_minimizeDataMovement = (minimizeDataMovement || replicaGroupPartitionConfig.isMinimizeDataMovement())
&& existingInstancePartitions != null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.assignment.instance;

import java.util.Arrays;
import javax.annotation.Nullable;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
Expand All @@ -31,25 +32,18 @@ private InstancePartitionSelectorFactory() {

public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
InstancePartitions existingInstancePartitions) {
return getInstance(partitionSelector, instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions, null);
}

public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
InstancePartitions existingInstancePartitions, InstancePartitions preConfiguredInstancePartitions
) {
InstancePartitions existingInstancePartitions, @Nullable InstancePartitions preConfiguredInstancePartitions,
boolean minimizeDataMovement) {
switch (partitionSelector) {
case FD_AWARE_INSTANCE_PARTITION_SELECTOR:
return new FDAwareInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
existingInstancePartitions, minimizeDataMovement);
case INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR:
return new InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
existingInstancePartitions, minimizeDataMovement);
case MIRROR_SERVER_SET_PARTITION_SELECTOR:
return new MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions, preConfiguredInstancePartitions);
existingInstancePartitions, preConfiguredInstancePartitions, minimizeDataMovement);
default:
throw new IllegalStateException("Unexpected PartitionSelector: " + partitionSelector + ", should be from"
+ Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values()));
Expand Down