Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pre-configuration based assignment #11578

Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -124,4 +124,14 @@ public static InstanceAssignmentConfig getInstanceAssignmentConfig(TableConfig t

return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig);
}

public static boolean isMirrorServerSetAssignment(TableConfig tableConfig,
InstancePartitionsType instancePartitionsType) {
// If the instance assignment config is not null and the partition selector is
// MIRROR_SERVER_SET_PARTITION_SELECTOR,
return tableConfig.getInstanceAssignmentConfigMap().get(instancePartitionsType.toString()) != null
&& InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, instancePartitionsType)
.getPartitionSelector()
== InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR;
}
}
Expand Up @@ -69,7 +69,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
import static org.apache.pinot.spi.utils.CommonConstants.*;


@Api(tags = Constants.TABLE_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY)})
Expand Down Expand Up @@ -244,20 +244,38 @@ public Map<String, InstancePartitions> assignInstances(
private void assignInstancesForInstancePartitionsType(Map<String, InstancePartitions> instancePartitionsMap,
TableConfig tableConfig, List<InstanceConfig> instanceConfigs, InstancePartitionsType instancePartitionsType) {
String tableNameWithType = tableConfig.getTableName();
if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
if (!TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) {
jasperjiaguo marked this conversation as resolved.
Show resolved Hide resolved
InstancePartitions existingInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString()));
instancePartitionsMap.put(instancePartitionsType.toString(),
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
instancePartitionsType.getInstancePartitionsName(rawTableName)));
return;
}
InstancePartitions existingInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
new InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions));
} else {
if (InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig, instancePartitionsType)) {
// fetch the existing instance partitions, if the table, this is referenced in the new instance partitions
// generation for minimum difference
InstancePartitions existingInstancePartitions = InstancePartitionsUtils.fetchInstancePartitions(
_resourceManager.getHelixZkManager().getHelixPropertyStore(),
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString()));
instancePartitionsMap.put(instancePartitionsType.toString(),
new InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions));
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
// fetch the pre-configured instance partitions, the renaming part is irrelevant as we are not really
// preserving this preConfigured, but only using it as a reference to generate the new instance partitions
InstancePartitions preConfigured =
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
instancePartitionsType.getInstancePartitionsName(rawTableName));
instancePartitionsMap.put(instancePartitionsType.toString(),
new InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions, preConfigured));
} else {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
instancePartitionsMap.put(instancePartitionsType.toString(),
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
instancePartitionsType.getInstancePartitionsName(rawTableName)));
}
}
}

private void assignInstancesForTier(Map<String, InstancePartitions> instancePartitionsMap, TableConfig tableConfig,
Expand Down
Expand Up @@ -30,6 +30,7 @@
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -50,6 +51,8 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
Expand All @@ -68,13 +71,14 @@
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
import static org.apache.pinot.spi.utils.CommonConstants.*;


/**
Expand Down Expand Up @@ -286,6 +290,82 @@ public String getTablesOnTenant(
}
}

@GET
@Path("/tenants/{tenantName}/instancePartitions")
jasperjiaguo marked this conversation as resolved.
Show resolved Hide resolved
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_INSTANCE_PARTITIONS)
@Authenticate(AccessType.READ)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get the instance partitions of a tenant")
@ApiResponses(value = {@ApiResponse(code = 200, message = "Success", response = InstancePartitions.class),
@ApiResponse(code = 404, message = "Instance partitions not found")})
public InstancePartitions getInstancePartitions(
@ApiParam(value = "Tenant name ", required = true) @PathParam("tenantName") String tenantName,
@ApiParam(value = "instancePartitionType (OFFLINE|CONSUMING|COMPLETED)", required = true,
allowableValues = "OFFLINE, CONSUMING, COMPLETED")
@QueryParam("instancePartitionType") String instancePartitionType) {
jasperjiaguo marked this conversation as resolved.
Show resolved Hide resolved
String tenantNameWithType = InstancePartitionsType.valueOf(instancePartitionType)
.getInstancePartitionsName(tenantName);
InstancePartitions instancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
tenantNameWithType);

if (instancePartitions == null) {
throw new ControllerApplicationException(LOGGER,
String.format("Failed to find the instance partitions for %s", tenantNameWithType),
Response.Status.NOT_FOUND);
} else {
return instancePartitions;
}
}

@PUT
@Path("/tenants/{tenantName}/instancePartitions")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.UPDATE_INSTANCE_PARTITIONS)
@Authenticate(AccessType.UPDATE)
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Update an instance partition for a server type in a tenant")
@ApiResponses(value = {@ApiResponse(code = 200, message = "Success", response = InstancePartitions.class),
@ApiResponse(code = 400, message = "Failed to deserialize/validate the instance partitions"),
@ApiResponse(code = 500, message = "Error updating the tenant")})
public InstancePartitions assignInstancesPartitionMap(
@ApiParam(value = "Tenant name ", required = true) @PathParam("tenantName") String tenantName,
@ApiParam(value = "instancePartitionType (OFFLINE|CONSUMING|COMPLETED)", required = true,
allowableValues = "OFFLINE, CONSUMING, COMPLETED")
@QueryParam("instancePartitionType") String instancePartitionType,
String instancePartitionsStr) {
jasperjiaguo marked this conversation as resolved.
Show resolved Hide resolved
InstancePartitions instancePartitions;
try {
instancePartitions = JsonUtils.stringToObject(instancePartitionsStr, InstancePartitions.class);
} catch (IOException e) {
throw new ControllerApplicationException(LOGGER, "Failed to deserialize the instance partitions",
Response.Status.BAD_REQUEST);
}

String inputTenantName = InstancePartitionsType.valueOf(instancePartitionType)
.getInstancePartitionsName(tenantName);

if (!instancePartitions.getInstancePartitionsName().equals(inputTenantName)) {
throw new ControllerApplicationException(LOGGER, "Instance partitions name mismatch, expected: "
+ inputTenantName
+ ", got: " + instancePartitions.getInstancePartitionsName(), Response.Status.BAD_REQUEST);
}

persistInstancePartitionsHelper(instancePartitions);
return instancePartitions;
}

private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) {
try {
LOGGER.info("Persisting instance partitions: {}", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
instancePartitions);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Caught Exception while persisting the instance partitions",
Response.Status.INTERNAL_SERVER_ERROR, e);
}
}

private String getTablesServedFromServerTenant(String tenantName) {
Set<String> tables = new HashSet<>();
ObjectNode resourceGetRet = JsonUtils.newObjectNode();
Expand Down
Expand Up @@ -1735,20 +1735,29 @@ private void assignInstances(TableConfig tableConfig, boolean override) {
for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) {
boolean hasPreConfiguredInstancePartitions =
TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType);
boolean isPreConfigurationBasedAssignment =
InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig, instancePartitionsType);
InstancePartitions instancePartitions;
if (!hasPreConfiguredInstancePartitions) {
instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null);
LOGGER.info("Persisting instance partitions: {}", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
} else {
String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
instancePartitions =
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore, referenceInstancePartitionsName,
instancePartitionsType.getInstancePartitionsName(rawTableName));
LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions,
referenceInstancePartitionsName);
InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
if (isPreConfigurationBasedAssignment) {
InstancePartitions preConfiguredInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName));
instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null,
preConfiguredInstancePartitions);
LOGGER.info("Persisting instance partitions: {}", instancePartitions);
jasperjiaguo marked this conversation as resolved.
Show resolved Hide resolved
} else {
instancePartitions = InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName));
LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions,
referenceInstancePartitionsName);
}
}
InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
}
}

Expand Down
Expand Up @@ -60,19 +60,31 @@ public InstancePartitions assignInstances(InstancePartitionsType instancePartiti
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
assignmentConfig, instanceConfigs, existingInstancePartitions);
assignmentConfig, instanceConfigs, existingInstancePartitions, null);
}

public InstancePartitions assignInstances(InstancePartitionsType instancePartitionsType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can the existing assignInstances(InstancePartitionsType instancePartitionsType, List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions) call this function since the remaining setup is the same? just to avoid too much code duplication

List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions existingInstancePartitions, @Nullable
InstancePartitions preConfiguredInstancePartitions) {
String tableNameWithType = _tableConfig.getTableName();
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig, instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
assignmentConfig, instanceConfigs, existingInstancePartitions, preConfiguredInstancePartitions);
}

public InstancePartitions assignInstances(String tierName, List<InstanceConfig> instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions, InstanceAssignmentConfig instanceAssignmentConfig) {
return getInstancePartitions(
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(), tierName),
instanceAssignmentConfig, instanceConfigs, existingInstancePartitions);
instanceAssignmentConfig, instanceConfigs, existingInstancePartitions, null);
}

private InstancePartitions getInstancePartitions(String instancePartitionsName,
InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig> instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions) {
@Nullable InstancePartitions existingInstancePartitions,
@Nullable InstancePartitions preConfiguredInstancePartitions) {
String tableNameWithType = _tableConfig.getTableName();
LOGGER.info("Starting {} instance assignment for table {}", instancePartitionsName, tableNameWithType);

Expand All @@ -93,7 +105,8 @@ private InstancePartitions getInstancePartitions(String instancePartitionsName,

InstancePartitionSelector instancePartitionSelector =
InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions);
instanceAssignmentConfig.getReplicaGroupPartitionConfig(), tableNameWithType, existingInstancePartitions,
preConfiguredInstancePartitions);
InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName);
instancePartitionSelector.selectInstances(poolToInstanceConfigsMap, instancePartitions);
return instancePartitions;
Expand Down
Expand Up @@ -31,7 +31,14 @@ private InstancePartitionSelectorFactory() {

public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
InstancePartitions existingInstancePartitions
InstancePartitions existingInstancePartitions) {
return getInstance(partitionSelector, instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions, null);
}

public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig, String tableNameWithType,
InstancePartitions existingInstancePartitions, InstancePartitions preConfiguredInstancePartitions
) {
switch (partitionSelector) {
case FD_AWARE_INSTANCE_PARTITION_SELECTOR:
Expand All @@ -40,6 +47,9 @@ public static InstancePartitionSelector getInstance(InstanceAssignmentConfig.Par
case INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR:
return new InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
case MIRROR_SERVER_SET_PARTITION_SELECTOR:
return new MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions, preConfiguredInstancePartitions);
default:
throw new IllegalStateException("Unexpected PartitionSelector: " + partitionSelector + ", should be from"
+ Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values()));
Expand Down