diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java index b37429c527f..30a3a19f20a 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java @@ -124,4 +124,14 @@ public static InstanceAssignmentConfig getInstanceAssignmentConfig(TableConfig t return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig); } + + public static boolean isMirrorServerSetAssignment(TableConfig tableConfig, + InstancePartitionsType instancePartitionsType) { + // If the instance assignment config is not null and the partition selector is + // MIRROR_SERVER_SET_PARTITION_SELECTOR, + return tableConfig.getInstanceAssignmentConfigMap().get(instancePartitionsType.toString()) != null + && InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, instancePartitionsType) + .getPartitionSelector() + == InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java index 3eeb6665a49..282431e04bf 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java @@ -69,7 +69,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; +import static org.apache.pinot.spi.utils.CommonConstants.*; @Api(tags = Constants.TABLE_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY)}) @@ -244,20 +244,38 @@ public Map assignInstances( private void assignInstancesForInstancePartitionsType(Map instancePartitionsMap, TableConfig tableConfig, List instanceConfigs, InstancePartitionsType instancePartitionsType) { String tableNameWithType = tableConfig.getTableName(); - if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) { - String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + if (!TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) { + InstancePartitions existingInstancePartitions = + InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(), + InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString())); instancePartitionsMap.put(instancePartitionsType.toString(), - InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(), - tableConfig.getInstancePartitionsMap().get(instancePartitionsType), - instancePartitionsType.getInstancePartitionsName(rawTableName))); - return; - } - InstancePartitions existingInstancePartitions = - InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(), + new InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, instanceConfigs, + existingInstancePartitions)); + } else { + if (InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig, instancePartitionsType)) { + // fetch the existing instance partitions, if the table, this is referenced in the new instance partitions + // generation for minimum difference + InstancePartitions existingInstancePartitions = InstancePartitionsUtils.fetchInstancePartitions( + _resourceManager.getHelixZkManager().getHelixPropertyStore(), InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString())); - instancePartitionsMap.put(instancePartitionsType.toString(), - new InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, instanceConfigs, - existingInstancePartitions)); + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + // fetch the pre-configured instance partitions, the renaming part is irrelevant as we are not really + // preserving this preConfigured, but only using it as a reference to generate the new instance partitions + InstancePartitions preConfigured = + InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(), + tableConfig.getInstancePartitionsMap().get(instancePartitionsType), + instancePartitionsType.getInstancePartitionsName(rawTableName)); + instancePartitionsMap.put(instancePartitionsType.toString(), + new InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, instanceConfigs, + existingInstancePartitions, preConfigured)); + } else { + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + instancePartitionsMap.put(instancePartitionsType.toString(), + InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(), + tableConfig.getInstancePartitionsMap().get(instancePartitionsType), + instancePartitionsType.getInstancePartitionsName(rawTableName))); + } + } } private void assignInstancesForTier(Map instancePartitionsMap, TableConfig tableConfig, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java index 3aa5da48e91..8166427a938 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java @@ -30,6 +30,7 @@ import io.swagger.annotations.Authorization; import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; +import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -50,6 +51,8 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.common.assignment.InstancePartitionsUtils; import org.apache.pinot.common.metadata.controllerjob.ControllerJobType; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -68,13 +71,14 @@ import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType; import org.apache.pinot.spi.config.tenant.Tenant; import org.apache.pinot.spi.config.tenant.TenantRole; import org.apache.pinot.spi.utils.JsonUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; +import static org.apache.pinot.spi.utils.CommonConstants.*; /** @@ -286,6 +290,82 @@ public String getTablesOnTenant( } } + @GET + @Path("/tenants/{tenantName}/instancePartitions") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_INSTANCE_PARTITIONS) + @Authenticate(AccessType.READ) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get the instance partitions of a tenant") + @ApiResponses(value = {@ApiResponse(code = 200, message = "Success", response = InstancePartitions.class), + @ApiResponse(code = 404, message = "Instance partitions not found")}) + public InstancePartitions getInstancePartitions( + @ApiParam(value = "Tenant name ", required = true) @PathParam("tenantName") String tenantName, + @ApiParam(value = "instancePartitionType (OFFLINE|CONSUMING|COMPLETED)", required = true, + allowableValues = "OFFLINE, CONSUMING, COMPLETED") + @QueryParam("instancePartitionType") String instancePartitionType) { + String tenantNameWithType = InstancePartitionsType.valueOf(instancePartitionType) + .getInstancePartitionsName(tenantName); + InstancePartitions instancePartitions = + InstancePartitionsUtils.fetchInstancePartitions(_pinotHelixResourceManager.getPropertyStore(), + tenantNameWithType); + + if (instancePartitions == null) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to find the instance partitions for %s", tenantNameWithType), + Response.Status.NOT_FOUND); + } else { + return instancePartitions; + } + } + + @PUT + @Path("/tenants/{tenantName}/instancePartitions") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.UPDATE_INSTANCE_PARTITIONS) + @Authenticate(AccessType.UPDATE) + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Update an instance partition for a server type in a tenant") + @ApiResponses(value = {@ApiResponse(code = 200, message = "Success", response = InstancePartitions.class), + @ApiResponse(code = 400, message = "Failed to deserialize/validate the instance partitions"), + @ApiResponse(code = 500, message = "Error updating the tenant")}) + public InstancePartitions assignInstancesPartitionMap( + @ApiParam(value = "Tenant name ", required = true) @PathParam("tenantName") String tenantName, + @ApiParam(value = "instancePartitionType (OFFLINE|CONSUMING|COMPLETED)", required = true, + allowableValues = "OFFLINE, CONSUMING, COMPLETED") + @QueryParam("instancePartitionType") String instancePartitionType, + String instancePartitionsStr) { + InstancePartitions instancePartitions; + try { + instancePartitions = JsonUtils.stringToObject(instancePartitionsStr, InstancePartitions.class); + } catch (IOException e) { + throw new ControllerApplicationException(LOGGER, "Failed to deserialize the instance partitions", + Response.Status.BAD_REQUEST); + } + + String inputTenantName = InstancePartitionsType.valueOf(instancePartitionType) + .getInstancePartitionsName(tenantName); + + if (!instancePartitions.getInstancePartitionsName().equals(inputTenantName)) { + throw new ControllerApplicationException(LOGGER, "Instance partitions name mismatch, expected: " + + inputTenantName + + ", got: " + instancePartitions.getInstancePartitionsName(), Response.Status.BAD_REQUEST); + } + + persistInstancePartitionsHelper(instancePartitions); + return instancePartitions; + } + + private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) { + try { + LOGGER.info("Persisting instance partitions: {}", instancePartitions); + InstancePartitionsUtils.persistInstancePartitions(_pinotHelixResourceManager.getPropertyStore(), + instancePartitions); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, "Caught Exception while persisting the instance partitions", + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + private String getTablesServedFromServerTenant(String tenantName) { Set tables = new HashSet<>(); ObjectNode resourceGetRet = JsonUtils.newObjectNode(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 860a9e1f830..d3cf819bf76 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1735,20 +1735,30 @@ private void assignInstances(TableConfig tableConfig, boolean override) { for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) { boolean hasPreConfiguredInstancePartitions = TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType); + boolean isPreConfigurationBasedAssignment = + InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig, instancePartitionsType); InstancePartitions instancePartitions; if (!hasPreConfiguredInstancePartitions) { instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null); LOGGER.info("Persisting instance partitions: {}", instancePartitions); - InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions); } else { String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType); - instancePartitions = - InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore, referenceInstancePartitionsName, - instancePartitionsType.getInstancePartitionsName(rawTableName)); - LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions, - referenceInstancePartitionsName); - InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions); + if (isPreConfigurationBasedAssignment) { + InstancePartitions preConfiguredInstancePartitions = + InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore, + referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName)); + instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null, + preConfiguredInstancePartitions); + LOGGER.info("Persisting instance partitions: {} (based on {})", instancePartitions, + preConfiguredInstancePartitions); + } else { + instancePartitions = InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore, + referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName)); + LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions, + referenceInstancePartitionsName); + } } + InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java index 7a5c9010293..6d869b86c16 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java @@ -60,19 +60,31 @@ public InstancePartitions assignInstances(InstancePartitionsType instancePartiti InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType); return getInstancePartitions( instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)), - assignmentConfig, instanceConfigs, existingInstancePartitions); + assignmentConfig, instanceConfigs, existingInstancePartitions, null); + } + + public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType, + List instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable + InstancePartitions preConfiguredInstancePartitions) { + String tableNameWithType = _tableConfig.getTableName(); + InstanceAssignmentConfig assignmentConfig = + InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType); + return getInstancePartitions( + instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)), + assignmentConfig, instanceConfigs, existingInstancePartitions, preConfiguredInstancePartitions); } public InstancePartitions assignInstances(String tierName, List instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig) { return getInstancePartitions( InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(), tierName), - instanceAssignmentConfig, instanceConfigs, existingInstancePartitions); + instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, null); } private InstancePartitions getInstancePartitions(String instancePartitionsName, InstanceAssignmentConfig instanceAssignmentConfig, List instanceConfigs, - @Nullable InstancePartitions existingInstancePartitions) { + @Nullable InstancePartitions existingInstancePartitions, + @Nullable InstancePartitions preConfiguredInstancePartitions) { String tableNameWithType = _tableConfig.getTableName(); LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType); @@ -93,7 +105,8 @@ private InstancePartitions getInstancePartitions(String instancePartitionsName, InstancePartitionSelector instancePartitionSelector = InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(), - instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions); + instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions, + preConfiguredInstancePartitions); InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName); instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions); return instancePartitions; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java index f786ffe0b41..256aa89b023 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java @@ -31,7 +31,14 @@ private InstancePartitionSelectorFactory() { public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector, InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType, - InstancePartitions existingInstancePartitions + InstancePartitions existingInstancePartitions) { + return getInstance(partitionSelector, instanceReplicaGroupPartitionConfig, tableNameWithType, + existingInstancePartitions, null); + } + + public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector, + InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType, + InstancePartitions existingInstancePartitions, InstancePartitions preConfiguredInstancePartitions ) { switch (partitionSelector) { case FD_AWARE_INSTANCE_PARTITION_SELECTOR: @@ -40,6 +47,9 @@ public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.Par case INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR: return new InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions); + case MIRROR_SERVER_SET_PARTITION_SELECTOR: + return new MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType, + existingInstancePartitions, preConfiguredInstancePartitions); default: throw new IllegalStateException("Unexpected PartitionSelector: " + partitionSelector + ", should be from" + Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values())); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java new file mode 100644 index 00000000000..6b4086615a5 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java @@ -0,0 +1,366 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.instance; + +import com.google.common.base.Preconditions; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.assignment.InstancePartitions; +import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Detailed design see https://docs.google.com/document/d/1xxPkGPxyY21gAkFi9gtFDeSzEXjPjp-IQW70kHynsL8 + * During each creation/update/scale, the algorithm will refer to the corresponding tenant level instance partitions and + * generate an instance partition by taking numInstancePerReplicaGroup mirror server sets from the tenant level + * instance partitions. + * + * If an existingInstancePartition is provided, the algorithm will generate a best effort assignment that resembles + * the existingInstancePartition. + * + * Assumptions for this algorithm: + * 1. The number of replica groups in the tenant level instance partitions is the same as the number of replica groups + * in the table config. + * 2. The number of partitions at replica group level is 1 + * 3. This algorithm only works for replica group based table assignment + */ +public class MirrorServerSetInstancePartitionSelector extends InstancePartitionSelector { + private static final Logger LOGGER = LoggerFactory.getLogger(MirrorServerSetInstancePartitionSelector.class); + private final InstancePartitions _preConfiguredInstancePartitions; + + // dimensions of target instance partition + private final int _numTargetInstancesPerReplicaGroup; + private final int _numTargetReplicaGroups; + private final int _numTargetTotalInstances; + // look up tables for pre-configured instance partition + private final List> _preConfiguredMirroredServerLists = new ArrayList<>(); + private final Map _preConfiguredInstanceNameToOffsetMap = new HashMap<>(); + private final List> _existingMirroredServerLists = new ArrayList<>(); + // dimensions of pre-configured instance partition + private int _numPreConfiguredReplicaGroups; + private int _numPreConfiguredInstancesPerReplicaGroup; + // dimensions of existing instance partition + private int _numExistingReplicaGroups; + private int _numExistingInstancesPerReplicaGroup; + + public MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, + String tableNameWithType, @Nullable InstancePartitions existingInstancePartitions, + InstancePartitions preConfiguredInstancePartitions) { + super(replicaGroupPartitionConfig, tableNameWithType, existingInstancePartitions); + _preConfiguredInstancePartitions = preConfiguredInstancePartitions; + _numTargetInstancesPerReplicaGroup = _replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup(); + _numTargetReplicaGroups = _replicaGroupPartitionConfig.getNumReplicaGroups(); + _numTargetTotalInstances = _numTargetInstancesPerReplicaGroup * _numTargetReplicaGroups; + } + + /** + * validate if the poolToInstanceConfigsMap is a valid input for pre-configuration based replica-group selection + */ + private void validatePoolDiversePreconditions(Map> poolToInstanceConfigsMap) { + + LOGGER.info("Validating pre-configured instance partitions for pre-configuration based replica-group selection"); + + // numTargetInstancesPerReplica should be positive + LOGGER.info("Number of instances per replica: {}", _numTargetInstancesPerReplicaGroup); + Preconditions.checkState(_numTargetInstancesPerReplicaGroup > 0, + "Number of instances per replica must be positive"); + + // _numTargetReplicaGroups should be positive + LOGGER.info("Number of replica-groups: {}", _numTargetReplicaGroups); + Preconditions.checkState(_numTargetReplicaGroups > 0, "Number of replica-groups must be positive"); + + // validate target partition count is 1 + LOGGER.info("Number of partitions: {}", _replicaGroupPartitionConfig.getNumPartitions()); + Preconditions.checkState(_replicaGroupPartitionConfig.getNumPartitions() <= 1, + "This algorithm does not support table level partitioning for target assignment"); + + // Validate the existing instance partitions is null or has only one partition + LOGGER.info("Number of partitions in existing instance partitions: {}", + _existingInstancePartitions == null ? 0 : _existingInstancePartitions.getNumPartitions()); + Preconditions.checkState( + (_existingInstancePartitions == null || _existingInstancePartitions.getNumPartitions() == 1), + "This algorithm does not support replica group level partitioning for existing assignment"); + + _numExistingReplicaGroups = + _existingInstancePartitions == null ? 0 : _existingInstancePartitions.getNumReplicaGroups(); + _numExistingInstancesPerReplicaGroup = + _existingInstancePartitions == null ? 0 : _existingInstancePartitions.getInstances(0, 0).size(); + + // Validate the pre-configured instance partitions is not null and has only one partition + Preconditions.checkState(_preConfiguredInstancePartitions != null, + "Pre-configured instance partitions must be provided for pre-configuration based selection"); + LOGGER.info("Number of partitions in pre-configured instance partitions: {}", + _preConfiguredInstancePartitions.getNumPartitions()); + Preconditions.checkState(_preConfiguredInstancePartitions.getNumPartitions() == 1, + "This algorithm does not support table level partitioning for pre-configured assignment"); + + // Validate the number of replica-groups in the pre-configured instance partitions is equal to the target + // number of replica-groups + _numPreConfiguredReplicaGroups = _preConfiguredInstancePartitions.getNumReplicaGroups(); + LOGGER.info("Number of replica-groups in pre-configured instance partitions: {}", _numPreConfiguredReplicaGroups); + Preconditions.checkState(_numPreConfiguredReplicaGroups == _numTargetReplicaGroups, + "The number of replica-groups %s in the pre-configured instance partitions " + + "is not equal to the target number of replica-groups %s", _numPreConfiguredReplicaGroups, + _numTargetReplicaGroups); + + // Validate the number of instances per replica-group in the pre-configured instance partitions is greater than or + // equal to the target number of instances per replica-group + _numPreConfiguredInstancesPerReplicaGroup = _preConfiguredInstancePartitions.getInstances(0, 0).size(); + LOGGER.info("Number of instances per replica-group in pre-configured instance partitions: {}, target number of " + + "instances per replica-group: {}", _numPreConfiguredInstancesPerReplicaGroup, + _numTargetInstancesPerReplicaGroup); + Preconditions.checkState(_numPreConfiguredInstancesPerReplicaGroup >= _numTargetInstancesPerReplicaGroup, + "The number of instances per replica-group in the pre-configured " + + "instance partitions is less than the target number of instances per replica-group %s", + _numTargetInstancesPerReplicaGroup); + + // Validate the pool to instance configs map is not null or empty + Preconditions.checkNotNull(poolToInstanceConfigsMap, "poolToInstanceConfigsMap is null"); + int numPools = poolToInstanceConfigsMap.size(); + Preconditions.checkState(numPools > 0, "No pool qualified for selection"); + Integer totalInstanceCount = poolToInstanceConfigsMap.values().stream().map(List::size) + .reduce(Integer::sum).orElse(0); + LOGGER.info("Total number of instances in all pools: {}, target number of instances: {}", totalInstanceCount, + _numTargetTotalInstances); + Preconditions.checkState(totalInstanceCount + >= _numTargetTotalInstances, + "The total number of instances in all pools is less than the target number of target instances"); + + HashSet availableInstanceSet = new HashSet<>(); + poolToInstanceConfigsMap.values().forEach(list -> list.forEach(i -> availableInstanceSet.add(i.getInstanceName()))); + LOGGER.info("Number of pools: {}", numPools); + LOGGER.info("Number of instances in all pools: {}", availableInstanceSet.size()); + LOGGER.info("availableInstanceSet: {}", availableInstanceSet); + + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + List instances = _preConfiguredInstancePartitions.getInstances(0, i); + for (String instance : instances) { + Preconditions.checkState(availableInstanceSet.contains(instance), + "Instance %s in pre-configured instance partitions is not in " + + "the pool to instance configs map", + instance); + } + } + + LOGGER.info("Validation passed. The instances provided can satisfy the pool diverse requirement."); + LOGGER.info("Trying to assign total {} instances to {} replica groups, " + "with {} instance per replica group", + _numTargetTotalInstances, _numTargetReplicaGroups, _numTargetInstancesPerReplicaGroup); + } + + private void createMirrorServerListFromPreconfiguredInstancePartition() { + List> preConfiguredReplicaGroups = new ArrayList<>(_numPreConfiguredReplicaGroups); + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0, i)); + } + + for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) { + List mirroredServerList = new ArrayList<>(); + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + mirroredServerList.add(preConfiguredReplicaGroups.get(i).get(j)); + } + _preConfiguredMirroredServerLists.add(mirroredServerList); + } + } + + private void createMirrorServerListLookupTablesFromPreconfiguredInstancePartition() { + List> preConfiguredReplicaGroups = new ArrayList<>(_numPreConfiguredReplicaGroups); + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0, i)); + } + + for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) { + for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) { + String instance = preConfiguredReplicaGroups.get(i).get(j); + _preConfiguredInstanceNameToOffsetMap.put(instance, j); + } + } + } + + @Override + public void selectInstances(Map> poolToInstanceConfigsMap, + InstancePartitions instancePartitions) { + // throw exception instantly if not replica-group based + if (!_replicaGroupPartitionConfig.isReplicaGroupBased()) { + throw new IllegalStateException("Does not support Non-replica-group based selection"); + } + + validatePoolDiversePreconditions(poolToInstanceConfigsMap); + if (_existingInstancePartitions == null) { + // If no existing instance partitions, create new instance partitions based on the pre-configured instance + // partitions. This is done by just selecting _targetNumInstancesPerReplicaGroup set of mirrored servers + // from the pre-configured instance partitions. + initialAssignment(instancePartitions); + } else { + // If existing instance partitions exist, adjust the existing instance partitions based on the pre-configured + // instance partitions. This code path takes care of instance replacement, uplift, and downlift. + // This is done by search in the pre-configured instance partitions for the mirrored + // servers sets that are similar to the existing sets in instance partitions. + scale(instancePartitions); + } + } + + private void initialAssignment(InstancePartitions instancePartitions) { + LOGGER.info("No existing instance partitions found. Will build new on top of" + + " the pre-configured instance partitions"); + // create a list of lists of mirrored servers from the pre-configured instance partitions + createMirrorServerListFromPreconfiguredInstancePartition(); + // shuffle the list of lists of mirrored servers based on the table name hash + int tableNameHash = Math.abs(_tableNameWithType.hashCode()); + // initialize a list of indices from 0 to _numPreConfiguredInstancesPerReplicaGroup + List shuffledIndex = new ArrayList<>(_numPreConfiguredInstancesPerReplicaGroup); + for (int i = 0; i < _numPreConfiguredInstancesPerReplicaGroup; i++) { + shuffledIndex.add(i); + } + // shuffle the list of indices based on the table name hash + Collections.shuffle(shuffledIndex, new Random(tableNameHash)); + // select the first _numTargetInstancesPerReplicaGroup indices + shuffledIndex = shuffledIndex.subList(0, _numTargetInstancesPerReplicaGroup); + // sort the list of indices so that they follow the original order of the pre-configured instance partitions + shuffledIndex.sort(Comparator.naturalOrder()); + + // create the instance partitions based on the shuffled list of mirrored servers + List> resultReplicaGroups = new ArrayList<>(_numTargetReplicaGroups); + for (int i = 0; i < _numTargetReplicaGroups; i++) { + resultReplicaGroups.add(new ArrayList<>(_numTargetInstancesPerReplicaGroup)); + } + + // populate the instance partitions with the selected mirrored servers + for (int j = 0; j < _numTargetInstancesPerReplicaGroup; j++) { + for (int i = 0; i < _numTargetReplicaGroups; i++) { + resultReplicaGroups.get(i).add(_preConfiguredMirroredServerLists.get(shuffledIndex.get(j)).get(i)); + } + } + for (int i = 0; i < _numTargetReplicaGroups; i++) { + instancePartitions.setInstances(0, i, resultReplicaGroups.get(i)); + } + } + + private void scale(InstancePartitions instancePartitions) { + LOGGER.info("Existing instance partitions found. Will adjust the existing instance partitions" + + " based on the pre-configured instance partitions"); + createMirrorServerListFromPreconfiguredInstancePartition(); + createMirrorServerListLookupTablesFromPreconfiguredInstancePartition(); + createListAndLookupTablesFromExistingInstancePartitions(); + Set usedPreconfiguredInstanceOffsets = new HashSet<>(); + Map> existingOffsetToResultTuple = new HashMap<>(); + + // For each instance offset, find the mirrored server that is most similar to the existing mirrored server + // set. If this mirrored server is not used, add it to the result list. + for (int j = 0; j < _numExistingInstancesPerReplicaGroup; j++) { + List existingMirroredServers = _existingMirroredServerLists.get(j); + int finalJ = j; + existingMirroredServers.stream() + .map(_preConfiguredInstanceNameToOffsetMap::get) + .filter(Objects::nonNull) + .filter(offset -> !usedPreconfiguredInstanceOffsets.contains(offset)) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())) + .entrySet() + .stream() + .max(Map.Entry.comparingByValue()) + .ifPresent(e -> { + existingOffsetToResultTuple.put(finalJ, e); + usedPreconfiguredInstanceOffsets.add(e.getKey()); + }); + } + + if (_numExistingInstancesPerReplicaGroup > _numTargetInstancesPerReplicaGroup) { + // If this is a downlift case + List> collect = existingOffsetToResultTuple.values() + .stream() + .sorted((a, b) -> b.getValue().compareTo(a.getValue())) + .limit(_numTargetInstancesPerReplicaGroup) + .collect(Collectors.toList()); + int size = collect.size(); + existingOffsetToResultTuple.clear(); + usedPreconfiguredInstanceOffsets.clear(); + for (int j = 0; j < size; j++) { + existingOffsetToResultTuple.put(j, collect.get(j)); + usedPreconfiguredInstanceOffsets.add(collect.get(j).getKey()); + } + } + + if (existingOffsetToResultTuple.size() < _numTargetInstancesPerReplicaGroup) { + // If the number of instances selected from the result list is less than the target number + // of instances per replica group, add the remaining instances from the pre-configured instance partitions. + List shuffledOffsets = new ArrayList<>(_numPreConfiguredInstancesPerReplicaGroup); + for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) { + shuffledOffsets.add(j); + } + for (Map.Entry> entry : existingOffsetToResultTuple.entrySet()) { + shuffledOffsets.remove(entry.getValue().getKey()); + } + Collections.shuffle(shuffledOffsets, new Random(Math.abs(_tableNameWithType.hashCode()))); + shuffledOffsets = + shuffledOffsets.subList(0, _numTargetInstancesPerReplicaGroup - existingOffsetToResultTuple.size()); + shuffledOffsets.sort(Comparator.naturalOrder()); + for (int k = 0, j = 0; j < _numTargetInstancesPerReplicaGroup; j++) { + if (existingOffsetToResultTuple.containsKey(j)) { + continue; + } + Integer offset = shuffledOffsets.get(k++); + existingOffsetToResultTuple.put(j, new AbstractMap.SimpleEntry<>(offset, 0L)); + usedPreconfiguredInstanceOffsets.add(offset); + } + } + + List> resultReplicaGroups = new ArrayList<>(_numTargetReplicaGroups); + for (int i = 0; i < _numTargetReplicaGroups; i++) { + resultReplicaGroups.add(new ArrayList<>(_numTargetInstancesPerReplicaGroup)); + } + for (int j = 0; j < _numTargetInstancesPerReplicaGroup; j++) { + List mirrorServers = _preConfiguredMirroredServerLists.get(existingOffsetToResultTuple.get(j).getKey()); + for (int i = 0; i < _numTargetReplicaGroups; i++) { + resultReplicaGroups.get(i).add(mirrorServers.get(i)); + } + } + for (int i = 0; i < _numTargetReplicaGroups; i++) { + instancePartitions.setInstances(0, i, resultReplicaGroups.get(i)); + } + } + + private void createListAndLookupTablesFromExistingInstancePartitions() { + List> existingReplicaGroups = new ArrayList<>(_numExistingReplicaGroups); + for (int i = 0; i < _numExistingReplicaGroups; i++) { + existingReplicaGroups.add(_existingInstancePartitions.getInstances(0, i)); + } + + for (int j = 0; j < _numExistingInstancesPerReplicaGroup; j++) { + List existingMirroredServerList = new ArrayList<>(); + for (int i = 0; i < _numExistingReplicaGroups; i++) { + existingMirroredServerList.add(existingReplicaGroups.get(i).get(j)); + } + _existingMirroredServerLists.add(existingMirroredServerList); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index a86c3e27a16..efd04d3bd61 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -527,31 +527,52 @@ private Pair getInstancePartitions(TableConfig tabl if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) { boolean hasPreConfiguredInstancePartitions = TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType); - if (hasPreConfiguredInstancePartitions) { - String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType); - InstancePartitions instancePartitions = - InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(), - referenceInstancePartitionsName, instancePartitionsName); - boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); + boolean isPreConfigurationBasedAssignment = + InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig, instancePartitionsType); + InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig); + InstancePartitions instancePartitions; + boolean instancePartitionsUnchanged; + if (!hasPreConfiguredInstancePartitions) { + LOGGER.info("Reassigning {} instances for table: {}", instancePartitionsType, tableNameWithType); + // Assign instances with existing instance partition to null if bootstrap mode is enabled, so that the + // instance partition map can be fully recalculated. + instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, + _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true), + bootstrap ? null : existingInstancePartitions); + instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); if (!dryRun && !instancePartitionsUnchanged) { - LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions, - referenceInstancePartitionsName); + LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions); InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions); } - return Pair.of(instancePartitions, instancePartitionsUnchanged); - } - LOGGER.info("Reassigning {} instances for table: {}", instancePartitionsType, tableNameWithType); - InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig); - // Assign instances with existing instance partition to null if bootstrap mode is enabled, so that the instance - // partition map can be fully recalculated. - InstancePartitions instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, - _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true), - bootstrap ? null : existingInstancePartitions); - boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); - if (!dryRun && !instancePartitionsUnchanged) { - LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions); - InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions); + } else { + String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType); + if (isPreConfigurationBasedAssignment) { + InstancePartitions preConfiguredInstancePartitions = + InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(), + referenceInstancePartitionsName, instancePartitionsName); + instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, + _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(), true), + bootstrap ? null : existingInstancePartitions, preConfiguredInstancePartitions); + instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); + if (!dryRun && !instancePartitionsUnchanged) { + LOGGER.info("Persisting instance partitions: {} (based on {})", instancePartitions, + preConfiguredInstancePartitions); + InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), + instancePartitions); + } + } else { + instancePartitions = + InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(), + referenceInstancePartitionsName, instancePartitionsName); + instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); + if (!dryRun && !instancePartitionsUnchanged) { + LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions, + referenceInstancePartitionsName); + InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), + instancePartitions); + } + } } return Pair.of(instancePartitions, instancePartitionsUnchanged); } else { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java index 4335d80b147..b25a529e101 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java @@ -18,12 +18,20 @@ */ package org.apache.pinot.controller.helix.core.assignment.instance; +import java.io.FileNotFoundException; +import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Random; +import java.util.Set; import org.apache.helix.model.InstanceConfig; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.utils.config.InstanceUtils; @@ -42,9 +50,7 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.Test; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.fail; +import static org.testng.Assert.*; public class InstanceAssignmentTest { @@ -54,6 +60,7 @@ public class InstanceAssignmentTest { private static final String SERVER_INSTANCE_ID_PREFIX = "Server_localhost_"; private static final String SERVER_INSTANCE_POOL_PREFIX = "_pool_"; private static final String TABLE_NAME_ZERO_HASH_COMPLEMENT = "12"; + public static final Logger LOGGER = LogManager.getLogger(InstanceAssignmentTest.class); @Test public void testDefaultOfflineReplicaGroup() { @@ -329,6 +336,957 @@ public void testDefaultOfflineReplicaGroup() { Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 0)); } + public void testMirrorServerSetBasedRandom() throws FileNotFoundException { + testMirrorServerSetBasedRandomInner(10000000); + } + + public void testMirrorServerSetBasedRandomInner(int loopCount) throws FileNotFoundException { + PrintStream o = new PrintStream("output.txt"); + System.setOut(o); + for (int iter = 0; iter < loopCount; iter++) { + System.out.printf("_____________________________ITERATION:%d________________________________%n", iter); + Random random1 = new Random(); + int numTargetReplicaGroups = random1.nextInt(7) + 1; + int numExistingReplicaGroups = random1.nextInt(7) + 1; + int numPreConfiguredInstancesPerReplicaGroup = random1.nextInt(10) + 5; + int numTargetInstancesPerReplicaGroup = Math.max(random1.nextInt(numPreConfiguredInstancesPerReplicaGroup), 5); + int numExistingInstancesPerReplicaGroup = Math.max(random1.nextInt(numPreConfiguredInstancesPerReplicaGroup), 5); + int numPools = random1.nextInt(10) + 1; + + int numPartitions = 0; + int numInstancesPerPartition = 0; + List instanceConfigs = new ArrayList<>(); + + int preConfiguredOffsetStart = random1.nextInt(10); + for (int i = 0; i < 1000; i++) { + int pool = i % numPools; + InstanceConfig instanceConfig = new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + instanceConfig.getRecord() + .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool))); + instanceConfigs.add(instanceConfig); + } + InstanceTagPoolConfig tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + InstanceReplicaGroupPartitionConfig replicaPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, numTargetReplicaGroups, numTargetInstancesPerReplicaGroup, + numPartitions, numInstancesPerPartition, false, null); + + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")) + .build(); + InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); + InstancePartitions preConfigured = new InstancePartitions("preConfigured"); + InstancePartitions existing = new InstancePartitions("existing"); + + List preconfiguredInstances = new LinkedList<>(); + List existingInstances = new LinkedList<>(); + + Set preConfiguredUsed = new HashSet<>(); + Set existingUsed = new HashSet<>(); + + for (int i = 0; i < numTargetReplicaGroups; i++) { + for (int j = 0; j < numPreConfiguredInstancesPerReplicaGroup; j++) { + int instance = + random1.nextInt((int) (1.5 * numTargetReplicaGroups * numPreConfiguredInstancesPerReplicaGroup)); + while (preConfiguredUsed.contains(instance)) { + instance = random1.nextInt((int) (1.5 * numTargetReplicaGroups * numPreConfiguredInstancesPerReplicaGroup)); + } + preConfiguredUsed.add(instance); + preconfiguredInstances.add(SERVER_INSTANCE_ID_PREFIX + (instance + preConfiguredOffsetStart)); + } + } + + for (int i = 0; i < numExistingReplicaGroups; i++) { + for (int j = 0; j < numExistingInstancesPerReplicaGroup; j++) { + int instance = random1.nextInt((int) (1.5 * numExistingReplicaGroups * numExistingInstancesPerReplicaGroup)); + while (existingUsed.contains(instance)) { + instance = random1.nextInt((int) (1.5 * numExistingReplicaGroups * numExistingInstancesPerReplicaGroup)); + } + existingUsed.add(instance); + existingInstances.add(SERVER_INSTANCE_ID_PREFIX + instance); + } + } + + Collections.shuffle(preconfiguredInstances); + Collections.shuffle(existingInstances); + + for (int i = 0; i < numTargetReplicaGroups; i++) { + preConfigured.setInstances(0, i, preconfiguredInstances.subList(i * numPreConfiguredInstancesPerReplicaGroup, + (i + 1) * numPreConfiguredInstancesPerReplicaGroup)); + } + + for (int i = 0; i < numExistingReplicaGroups; i++) { + existing.setInstances(0, i, existingInstances.subList(i * numExistingInstancesPerReplicaGroup, + (i + 1) * numExistingInstancesPerReplicaGroup)); + } + + System.out.println("Done initializing preconfigured and existing instances"); + System.out.println("numTargetReplicaGroups " + numTargetReplicaGroups); + System.out.println("numPreConfiguredInstancesPerReplicaGroup " + numPreConfiguredInstancesPerReplicaGroup); + System.out.println("numTargetInstancesPerReplicaGroup " + numTargetInstancesPerReplicaGroup); + + System.out.println("numExistingReplicaGroups " + numExistingReplicaGroups); + System.out.println("numExistingInstancesPerReplicaGroup " + numExistingInstancesPerReplicaGroup); + System.out.println(""); + for (int i = 0; i < numTargetReplicaGroups; i++) { + System.out.println("Preconfigured instances for replica group " + i + " : " + preConfigured.getInstances(0, i)); + } + System.out.println(""); + for (int i = 0; i < numExistingReplicaGroups; i++) { + System.out.println("Existing instances for replica group " + i + " : " + existing.getInstances(0, i)); + } + System.out.println(""); + InstancePartitions instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existing, preConfigured); + assertEquals(instancePartitions.getNumReplicaGroups(), numTargetReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + + for (int i = 0; i < numTargetReplicaGroups; i++) { + System.out.println("Assigned instances for replica group " + i + " : " + instancePartitions.getInstances(0, i)); + } + } + } + + @Test + public void testMirrorServerSetBased() { + LogManager.getLogger(MirrorServerSetInstancePartitionSelector.class) + .setLevel(Level.INFO); + + // Test initial assignment 3 replica groups, 7 instances per rg. + int numPartitions = 0; + int numInstancesPerPartition = 0; + int numInstances = 21; + int numPools = 5; + int numReplicaGroups = 3; + int numInstancesPerReplicaGroup = numInstances / numReplicaGroups; + List instanceConfigs = new ArrayList<>(numInstances); + for (int i = 0; i < 100; i++) { + int pool = i % numPools; + InstanceConfig instanceConfig = + new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i); + instanceConfig.addTag(OFFLINE_TAG); + instanceConfig.getRecord() + .setMapField(InstanceUtils.POOL_KEY, Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool))); + instanceConfigs.add(instanceConfig); + } + InstanceTagPoolConfig tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + InstanceReplicaGroupPartitionConfig replicaPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, + numInstancesPerPartition, false, null); + + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); + InstanceAssignmentDriver driver = new InstanceAssignmentDriver(tableConfig); + InstancePartitions preConfigured = new InstancePartitions("preConfigured"); + preConfigured.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 6, + SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 15, + SERVER_INSTANCE_ID_PREFIX + 18)); + preConfigured.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 16, + SERVER_INSTANCE_ID_PREFIX + 19)); + preConfigured.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14, SERVER_INSTANCE_ID_PREFIX + 17, + SERVER_INSTANCE_ID_PREFIX + 20)); + + InstancePartitions instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, null, preConfigured); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + /* + * Pre-configured partitioning: + * RG1 RG2 RG3 + * Host 0 1 2 + * Host 3 4 5 + * Host 6 7 8 + * Host 9 10 11 + * Host 12 13 14 + * Host 15 16 17 + * Host 18 19 20 + * + * Final assignment for this table: + * RG1 RG2 RG3 + * Host 0 1 2 + * Host 3 4 5 + * Host 6 7 8 + * Host 9 10 11 + * Host 12 13 14 + * Host 15 16 17 + * Host 18 19 20 + */ + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, + SERVER_INSTANCE_ID_PREFIX + 3, + SERVER_INSTANCE_ID_PREFIX + 6, + SERVER_INSTANCE_ID_PREFIX + 9, + SERVER_INSTANCE_ID_PREFIX + 12, + SERVER_INSTANCE_ID_PREFIX + 15, + SERVER_INSTANCE_ID_PREFIX + 18)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, + SERVER_INSTANCE_ID_PREFIX + 4, + SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 10, + SERVER_INSTANCE_ID_PREFIX + 13, + SERVER_INSTANCE_ID_PREFIX + 16, + SERVER_INSTANCE_ID_PREFIX + 19)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 5, + SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 11, + SERVER_INSTANCE_ID_PREFIX + 14, + SERVER_INSTANCE_ID_PREFIX + 17, + SERVER_INSTANCE_ID_PREFIX + 20)); + + // Test instance shuffling/uplifting from 3*5 to 3*7 + numPartitions = 0; + numInstancesPerPartition = 0; + numInstances = 21; + numReplicaGroups = 3; + numInstancesPerReplicaGroup = numInstances / numReplicaGroups; + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + replicaPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, + numInstancesPerPartition, false, null); + + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); + driver = new InstanceAssignmentDriver(tableConfig); + preConfigured = new InstancePartitions("preConfigured"); + preConfigured.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 6, + SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 15, + SERVER_INSTANCE_ID_PREFIX + 18)); + preConfigured.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 16, + SERVER_INSTANCE_ID_PREFIX + 19)); + preConfigured.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14, SERVER_INSTANCE_ID_PREFIX + 17, + SERVER_INSTANCE_ID_PREFIX + 20)); + + InstancePartitions existingInstancePartitions = new InstancePartitions("existing"); + existingInstancePartitions.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 1, + SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 9)); + existingInstancePartitions.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 4, + SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 10)); + existingInstancePartitions.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 14, SERVER_INSTANCE_ID_PREFIX + 5, + SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 11)); + + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, + existingInstancePartitions, preConfigured); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + /* + * uplift from 15 instances in 3 replicas to 21 instance in 3 replicas + * 21 instances in 4 pools + * Pre-configured partitioning: + * RG1 RG2 RG3 + * Host 0 1 2 + * Host 3 4 5 + * Host 6 7 8 + * Host 9 10 11 + * Host 12 13 14 + * Host 15 16 17 + * Host 18 19 20 + * + * Existing configured partitioning: + * RG1 RG2 RG3 + * Host 0 6 2 + * Host 12 7 14 + * Host 1 4 5 + * Host 3 13 8 + * Host 9 10 11 + * + * Final assignment for this table: + * RG1 RG2 RG3 + * Host 0 1 2 + * Host 12 13 14 + * Host 3 4 5 + * Host 6 7 8 + * Host 9 10 11 + * Host 15 16 17 + * Host 18 19 20 + */ + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, + SERVER_INSTANCE_ID_PREFIX + 12, + SERVER_INSTANCE_ID_PREFIX + 3, + SERVER_INSTANCE_ID_PREFIX + 6, + SERVER_INSTANCE_ID_PREFIX + 9, + SERVER_INSTANCE_ID_PREFIX + 15, + SERVER_INSTANCE_ID_PREFIX + 18)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, + SERVER_INSTANCE_ID_PREFIX + 13, + SERVER_INSTANCE_ID_PREFIX + 4, + SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 10, + SERVER_INSTANCE_ID_PREFIX + 16, + SERVER_INSTANCE_ID_PREFIX + 19)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 14, + SERVER_INSTANCE_ID_PREFIX + 5, + SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 11, + SERVER_INSTANCE_ID_PREFIX + 17, + SERVER_INSTANCE_ID_PREFIX + 20)); + + // Test instance replacement from 3*6 to 3*5 + numPartitions = 0; + numInstancesPerPartition = 0; + numInstances = 15; + numReplicaGroups = 3; + numInstancesPerReplicaGroup = numInstances / numReplicaGroups; + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + replicaPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, + numInstancesPerPartition, false, null); + + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); + driver = new InstanceAssignmentDriver(tableConfig); + preConfigured = new InstancePartitions("preConfigured"); + preConfigured.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18, SERVER_INSTANCE_ID_PREFIX + 21, SERVER_INSTANCE_ID_PREFIX + 24, + SERVER_INSTANCE_ID_PREFIX + 27, SERVER_INSTANCE_ID_PREFIX + 30)); + preConfigured.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 19, SERVER_INSTANCE_ID_PREFIX + 22, SERVER_INSTANCE_ID_PREFIX + 25, + SERVER_INSTANCE_ID_PREFIX + 28, SERVER_INSTANCE_ID_PREFIX + 31)); + preConfigured.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 20, SERVER_INSTANCE_ID_PREFIX + 23, SERVER_INSTANCE_ID_PREFIX + 26, + SERVER_INSTANCE_ID_PREFIX + 29, SERVER_INSTANCE_ID_PREFIX + 32)); + + existingInstancePartitions = new InstancePartitions("existing"); + existingInstancePartitions.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 6, + SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 15, SERVER_INSTANCE_ID_PREFIX + 18)); + existingInstancePartitions.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 16, SERVER_INSTANCE_ID_PREFIX + 19)); + existingInstancePartitions.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 17, SERVER_INSTANCE_ID_PREFIX + 20)); + + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions, + preConfigured); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + /* + * From 18 instances in 3 replicas to 15 instance in 3 replicas + * Pre-configured partitioning: + * RG1 RG2 RG3 + * Host 18 19 20 + * Host 21 22 23 + * Host 24 25 26 + * Host 27 28 29 + * Host 30 31 32 + * + * Existing configured partitioning: + * RG1 RG2 RG3 + * Host 0 1 2 + * Host 3 4 5 + * Host 6 7 8 + * Host 9 10 11 + * Host 15 16 17 + * Host 18 19 20 + * + * Final assignment for this table: + * RG1 RG2 RG3 + * Host 18 19 20 + * Host 21 22 23 + * Host 24 25 26 + * Host 27 28 29 + * Host 30 31 32 + * + */ + + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18, + SERVER_INSTANCE_ID_PREFIX + 21, + SERVER_INSTANCE_ID_PREFIX + 24, + SERVER_INSTANCE_ID_PREFIX + 27, + SERVER_INSTANCE_ID_PREFIX + 30)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 19, + SERVER_INSTANCE_ID_PREFIX + 22, + SERVER_INSTANCE_ID_PREFIX + 25, + SERVER_INSTANCE_ID_PREFIX + 28, + SERVER_INSTANCE_ID_PREFIX + 31)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 20, + SERVER_INSTANCE_ID_PREFIX + 23, + SERVER_INSTANCE_ID_PREFIX + 26, + SERVER_INSTANCE_ID_PREFIX + 29, + SERVER_INSTANCE_ID_PREFIX + 32)); + + // Test instance shuffling/uplifting from 3*5 to 3*7, with some instance replacement + numPartitions = 0; + numInstancesPerPartition = 0; + numInstances = 18; + numReplicaGroups = 3; + numInstancesPerReplicaGroup = numInstances / numReplicaGroups; + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + replicaPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, + numInstancesPerPartition, false, null); + + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); + driver = new InstanceAssignmentDriver(tableConfig); + preConfigured = new InstancePartitions("preConfigured"); + preConfigured.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 6, + SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 15, SERVER_INSTANCE_ID_PREFIX + 18)); + preConfigured.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 16, SERVER_INSTANCE_ID_PREFIX + 19)); + preConfigured.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 17, SERVER_INSTANCE_ID_PREFIX + 20)); + + existingInstancePartitions = new InstancePartitions("existing"); + existingInstancePartitions.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 1, + SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 9)); + existingInstancePartitions.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 4, + SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 10)); + existingInstancePartitions.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 14, SERVER_INSTANCE_ID_PREFIX + 5, + SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 11)); + + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions, + preConfigured); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + /* + * uplift from 15 instances in 3 replicas to 21 instance in 3 replicas + * Pre-configured partitioning: + * RG1 RG2 RG3 + * Host 0 1 2 + * Host 3 4 5 + * Host 6 7 8 + * Host 9 10 11 + * Host 15 16 17 + * Host 18 19 20 + * + * Existing configured partitioning: + * RG1 RG2 RG3 + * Host 0 6 2 + * Host 12 7 14 + * Host 1 4 5 + * Host 3 13 8 + * Host 9 10 11 + * + * Final assignment for this table: + * RG1 RG2 RG3 + * Host 0 1 2 + * Host 6 7 8 + * Host 3 4 5 + * Host 15 16 17 + * Host 9 10 11 + * Host 18 19 20 + */ + + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, + SERVER_INSTANCE_ID_PREFIX + 6, + SERVER_INSTANCE_ID_PREFIX + 3, + SERVER_INSTANCE_ID_PREFIX + 15, + SERVER_INSTANCE_ID_PREFIX + 9, + SERVER_INSTANCE_ID_PREFIX + 18)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, + SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 4, + SERVER_INSTANCE_ID_PREFIX + 16, + SERVER_INSTANCE_ID_PREFIX + 10, + SERVER_INSTANCE_ID_PREFIX + 19)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 5, + SERVER_INSTANCE_ID_PREFIX + 17, + SERVER_INSTANCE_ID_PREFIX + 11, + SERVER_INSTANCE_ID_PREFIX + 20)); + + // Test instance shuffling/uplifting from 3*5 to 4*6 + numPartitions = 0; + numInstancesPerPartition = 0; + numInstances = 24; + numReplicaGroups = 4; + numInstancesPerReplicaGroup = numInstances / numReplicaGroups; + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + replicaPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, + numInstancesPerPartition, false, null); + + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); + driver = new InstanceAssignmentDriver(tableConfig); + preConfigured = new InstancePartitions("preConfigured"); + preConfigured.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 6, + SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 15)); + preConfigured.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 16)); + preConfigured.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14, SERVER_INSTANCE_ID_PREFIX + 17)); + preConfigured.setInstances(0, 3, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18, SERVER_INSTANCE_ID_PREFIX + 19, SERVER_INSTANCE_ID_PREFIX + 20, + SERVER_INSTANCE_ID_PREFIX + 21, SERVER_INSTANCE_ID_PREFIX + 22, SERVER_INSTANCE_ID_PREFIX + 23)); + + existingInstancePartitions = new InstancePartitions("existing"); + existingInstancePartitions.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 1, + SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 9)); + existingInstancePartitions.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 7, SERVER_INSTANCE_ID_PREFIX + 4, + SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 10)); + existingInstancePartitions.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 14, SERVER_INSTANCE_ID_PREFIX + 5, + SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 11)); + + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions, + preConfigured); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + /* + * Test instance shuffling/uplifting from 3*5 to 4*6 + * Pre-configured partitioning: + * RG1 RG2 RG3 RG4 + * Host 0 1 2 18 + * Host 3 4 5 19 + * Host 6 7 8 20 + * Host 9 10 11 21 + * Host 12 13 14 22 + * Host 15 16 17 23 + * + * Existing configured partitioning: + * RG1 RG2 RG3 + * Host 0 6 2 + * Host 12 7 14 + * Host 1 4 5 + * Host 3 13 8 + * Host 9 10 11 + * + * Final assignment for this table: + * RG1 RG2 RG3 + * Host 0 1 2 18 + * Host 12 13 14 22 + * Host 3 4 5 19 + * Host 6 7 8 20 + * Host 9 10 11 21 + * Host 15 16 17 23 + */ + + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, + SERVER_INSTANCE_ID_PREFIX + 12, + SERVER_INSTANCE_ID_PREFIX + 3, + SERVER_INSTANCE_ID_PREFIX + 6, + SERVER_INSTANCE_ID_PREFIX + 9, + SERVER_INSTANCE_ID_PREFIX + 15)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, + SERVER_INSTANCE_ID_PREFIX + 13, + SERVER_INSTANCE_ID_PREFIX + 4, + SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 10, + SERVER_INSTANCE_ID_PREFIX + 16)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 14, + SERVER_INSTANCE_ID_PREFIX + 5, + SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 11, + SERVER_INSTANCE_ID_PREFIX + 17)); + assertEquals(instancePartitions.getInstances(0, 3), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18, + SERVER_INSTANCE_ID_PREFIX + 22, + SERVER_INSTANCE_ID_PREFIX + 19, + SERVER_INSTANCE_ID_PREFIX + 20, + SERVER_INSTANCE_ID_PREFIX + 21, + SERVER_INSTANCE_ID_PREFIX + 23)); + + // Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling of instances + numPartitions = 0; + numInstancesPerPartition = 0; + numInstances = 12; + numReplicaGroups = 3; + numInstancesPerReplicaGroup = numInstances / numReplicaGroups; + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + replicaPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, + numInstancesPerPartition, false, null); + + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); + driver = new InstanceAssignmentDriver(tableConfig); + preConfigured = new InstancePartitions("preConfigured"); + preConfigured.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 14)); + preConfigured.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 22, SERVER_INSTANCE_ID_PREFIX + 13, + SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 17)); + preConfigured.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18, SERVER_INSTANCE_ID_PREFIX + 19, SERVER_INSTANCE_ID_PREFIX + 20, + SERVER_INSTANCE_ID_PREFIX + 21, SERVER_INSTANCE_ID_PREFIX + 23)); + + existingInstancePartitions = new InstancePartitions("existing"); + existingInstancePartitions.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, + SERVER_INSTANCE_ID_PREFIX + 12, + SERVER_INSTANCE_ID_PREFIX + 3, + SERVER_INSTANCE_ID_PREFIX + 6, + SERVER_INSTANCE_ID_PREFIX + 9, + SERVER_INSTANCE_ID_PREFIX + 15)); + existingInstancePartitions.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, + SERVER_INSTANCE_ID_PREFIX + 13, + SERVER_INSTANCE_ID_PREFIX + 4, + SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 10, + SERVER_INSTANCE_ID_PREFIX + 16)); + existingInstancePartitions.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 14, + SERVER_INSTANCE_ID_PREFIX + 5, + SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 11, + SERVER_INSTANCE_ID_PREFIX + 17)); + existingInstancePartitions.setInstances(0, 3, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18, + SERVER_INSTANCE_ID_PREFIX + 22, + SERVER_INSTANCE_ID_PREFIX + 19, + SERVER_INSTANCE_ID_PREFIX + 20, + SERVER_INSTANCE_ID_PREFIX + 21, + SERVER_INSTANCE_ID_PREFIX + 23)); + + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions, + preConfigured); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + /* + * Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling of instances + * Pre-configured partitioning: + * RG2 RG3 RG4 + * Host 1 2 18 + * Host 4 22 19 + * Host 7 13 20 + * Host 10 11 21 + * Host 14 17 23 + * + * Existing configured partitioning: + * RG1 RG2 RG3 RG4 + * Host 0 1 2 18 + * Host 3 4 5 19 + * Host 6 7 8 20 + * Host 9 10 11 21 + * Host 12 13 14 22 + * Host 15 16 17 23 + * + * Final assignment for this table: + * RG1 RG2 RG3 + * Host 1 2 18 + * Host 10 11 21 + * Host 7 13 20 + * Host 14 17 23 + */ + + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, + SERVER_INSTANCE_ID_PREFIX + 10, + SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 14)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 11, + SERVER_INSTANCE_ID_PREFIX + 13, + SERVER_INSTANCE_ID_PREFIX + 17)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18, + SERVER_INSTANCE_ID_PREFIX + 21, + SERVER_INSTANCE_ID_PREFIX + 20, + SERVER_INSTANCE_ID_PREFIX + 23)); + + + // upscale 3*3 to 3*5 + numPartitions = 0; + numInstancesPerPartition = 0; + numInstances = 15; + numReplicaGroups = 3; + numInstancesPerReplicaGroup = numInstances / numReplicaGroups; + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + replicaPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, + numInstancesPerPartition, false, null); + + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); + driver = new InstanceAssignmentDriver(tableConfig); + preConfigured = new InstancePartitions("preConfigured"); + preConfigured.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13)); + preConfigured.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14)); + preConfigured.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 9, + SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 15)); + + existingInstancePartitions = new InstancePartitions("existing"); + existingInstancePartitions.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, + SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 3)); + + existingInstancePartitions.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4, + SERVER_INSTANCE_ID_PREFIX + 5, + SERVER_INSTANCE_ID_PREFIX + 6)); + + existingInstancePartitions.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 9)); + + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions, + preConfigured); + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + /* + * Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling of instances + * Pre-configured partitioning: + * RG2 RG3 RG4 + * Host 1 2 3 + * Host 4 5 6 + * Host 7 8 9 + * Host 10 11 12 + * Host 13 14 15 + * + * Existing configured partitioning: + * RG1 RG2 RG3 + * Host 1 2 3 + * Host 4 5 6 + * Host 7 8 9 + * + * Final assignment for this table: + * RG1 RG2 RG3 + * Host 1 2 3 + * Host 4 5 6 + * Host 7 8 9 + * Host 10 11 12 + * Host 13 14 15 + */ + + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, + SERVER_INSTANCE_ID_PREFIX + 4, + SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 10, + SERVER_INSTANCE_ID_PREFIX + 13)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, + SERVER_INSTANCE_ID_PREFIX + 5, + SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 11, + SERVER_INSTANCE_ID_PREFIX + 14)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, + SERVER_INSTANCE_ID_PREFIX + 6, + SERVER_INSTANCE_ID_PREFIX + 9, + SERVER_INSTANCE_ID_PREFIX + 12, + SERVER_INSTANCE_ID_PREFIX + 15)); + + // downscale 3*5 to 3*3 + numPartitions = 0; + numInstancesPerPartition = 0; + numInstances = 9; + numReplicaGroups = 3; + numInstancesPerReplicaGroup = numInstances / numReplicaGroups; + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + replicaPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, + numInstancesPerPartition, false, null); + + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); + driver = new InstanceAssignmentDriver(tableConfig); + + preConfigured = new InstancePartitions("preConfigured"); + preConfigured.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 7)); + preConfigured.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 8)); + preConfigured.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 9)); + + existingInstancePartitions = new InstancePartitions("existing"); + existingInstancePartitions.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 7, + SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13)); + existingInstancePartitions.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 8, + SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14)); + existingInstancePartitions.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 9, + SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 15)); + + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions, + preConfigured); + + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + + /* + * Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling of instances + * Pre-configured partitioning: + * RG2 RG3 RG4 + * Host 1 2 3 + * Host 4 5 6 + * Host 7 8 9 + * + * Existing configured partitioning: + * RG1 RG2 RG3 + * Host 1 2 3 + * Host 4 5 6 + * Host 7 8 9 + * Host 10 11 12 + * Host 13 14 15 + * + * Final assignment for this table: + * RG1 RG2 RG3 + * Host 1 2 3 + * Host 4 5 6 + * Host 7 8 9 + */ + + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 7)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 9)); + + // replace instance 5 with instance 11 + numPartitions = 0; + numInstancesPerPartition = 0; + numInstances = 9; + numReplicaGroups = 3; + numInstancesPerReplicaGroup = numInstances / numReplicaGroups; + tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null); + replicaPartitionConfig = + new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, numInstancesPerReplicaGroup, numPartitions, + numInstancesPerPartition, false, null); + + tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(), + new InstanceAssignmentConfig(tagPoolConfig, null, replicaPartitionConfig, + InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString()))) + .setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE, "preConfigured")).build(); + driver = new InstanceAssignmentDriver(tableConfig); + + preConfigured = new InstancePartitions("preConfigured"); + preConfigured.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 7)); + preConfigured.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 8)); + preConfigured.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 9)); + + existingInstancePartitions = new InstancePartitions("existing"); + existingInstancePartitions.setInstances(0, 0, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 7)); + existingInstancePartitions.setInstances(0, 1, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 5, SERVER_INSTANCE_ID_PREFIX + 8)); + existingInstancePartitions.setInstances(0, 2, + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 9)); + + instancePartitions = + driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs, existingInstancePartitions, + preConfigured); + + assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups); + assertEquals(instancePartitions.getNumPartitions(), 1); + + /* + * Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling of instances + * Pre-configured partitioning: + * RG2 RG3 RG4 + * Host 1 2 3 + * Host 4 11 6 + * Host 7 8 9 + * + * Existing configured partitioning: + * RG2 RG3 RG4 + * Host 1 2 3 + * Host 4 5 6 + * Host 7 8 9 + * + * Final assignment for this table: + * RG1 RG2 RG3 + * Host 1 2 3 + * Host 4 11 6 + * Host 7 8 9 + */ + + // Verifying the final configuration after downlifting + assertEquals(instancePartitions.getInstances(0, 0), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX + 4, SERVER_INSTANCE_ID_PREFIX + 7)); + assertEquals(instancePartitions.getInstances(0, 1), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 8)); + assertEquals(instancePartitions.getInstances(0, 2), + Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX + 9)); + } + @Test public void testPoolBased() { // 10 instances in 2 pools, each with 5 instances diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java index e72d066bc98..7c9ea081d4a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java @@ -85,6 +85,8 @@ public static class Cluster { public static final String UPDATE_USER = "UpdateUser"; public static final String UPDATE_ZNODE = "UpdateZnode"; public static final String UPLOAD_SEGMENT = "UploadSegment"; + public static final String GET_INSTANCE_PARTITIONS = "GetInstancePartitions"; + public static final String UPDATE_INSTANCE_PARTITIONS = "UpdateInstancePartitions"; } // Action names for table diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 71e47c7c62c..aad0a992e74 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -766,11 +766,25 @@ static void validateInstancePartitionsTypeMapConfig(TableConfig tableConfig) { tableConfig.getInstanceAssignmentConfigMap())) { return; } - for (InstancePartitionsType instancePartitionsType : tableConfig.getInstancePartitionsMap().keySet()) { - Preconditions.checkState( - !tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType.toString()), - String.format("Both InstanceAssignmentConfigMap and InstancePartitionsMap set for %s", - instancePartitionsType)); + + for (InstancePartitionsType instancePartitionsType : InstancePartitionsType.values()) { + if (tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType.toString())) { + InstanceAssignmentConfig instanceAssignmentConfig = + tableConfig.getInstanceAssignmentConfigMap().get(instancePartitionsType.toString()); + if (instanceAssignmentConfig.getPartitionSelector() + == InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR) { + Preconditions.checkState( + tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), + String.format("Both InstanceAssignmentConfigMap and InstancePartitionsMap needed for %s, as " + + "MIRROR_SERVER_SET_PARTITION_SELECTOR is used", + instancePartitionsType)); + } else { + Preconditions.checkState( + !tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType), + String.format("Both InstanceAssignmentConfigMap and InstancePartitionsMap set for %s", + instancePartitionsType)); + } + } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java index 186f545cea3..391ba4812d3 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java @@ -82,6 +82,7 @@ public InstanceReplicaGroupPartitionConfig getReplicaGroupPartitionConfig() { } public enum PartitionSelector { - FD_AWARE_INSTANCE_PARTITION_SELECTOR, INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR + FD_AWARE_INSTANCE_PARTITION_SELECTOR, INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR, + MIRROR_SERVER_SET_PARTITION_SELECTOR } }