Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.utils.config;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand All @@ -27,6 +28,7 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.helix.ExtraInstanceConfig;
import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;

Expand Down Expand Up @@ -159,6 +161,64 @@ public static void updateHelixInstanceConfig(InstanceConfig instanceConfig, Inst
}
}

/**
* Converts a Helix InstanceConfig to a pinot-spi Instance. This is the reverse of
* {@link #toHelixInstanceConfig(Instance)}. Extracts host, port, type, tags, pools,
* and optional port fields from the InstanceConfig's ZNRecord.
*/
public static Instance toInstance(InstanceConfig instanceConfig) {
String instanceId = instanceConfig.getInstanceName();
ZNRecord znRecord = instanceConfig.getRecord();
Map<String, String> simpleFields = znRecord.getSimpleFields();

// Extract type from instance ID prefix (the only reliable source for type)
InstanceType type;
if (instanceId.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
type = InstanceType.SERVER;
} else if (instanceId.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE)) {
type = InstanceType.BROKER;
} else if (instanceId.startsWith(Helix.PREFIX_OF_CONTROLLER_INSTANCE)) {
type = InstanceType.CONTROLLER;
} else if (instanceId.startsWith(Helix.PREFIX_OF_MINION_INSTANCE)) {
type = InstanceType.MINION;
} else {
throw new IllegalArgumentException("Unknown instance type for: " + instanceId);
}

// Read host/port from ZNRecord simple fields (source of truth, updated by updateHelixInstanceConfig)
// Backward-compatible with legacy hostname of format 'Server_<hostname>'
String host = instanceConfig.getHostName();
Comment thread
suvodeep-pyne marked this conversation as resolved.
if (type == InstanceType.SERVER && host.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
host = host.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH);
}
int port = Integer.parseInt(instanceConfig.getPort());

Comment thread
suvodeep-pyne marked this conversation as resolved.
// Extract tags
List<String> tags = instanceConfig.getTags();

// Extract pools
Map<String, Integer> pools = null;
Map<String, String> poolMap = znRecord.getMapField(POOL_KEY);
if (MapUtils.isNotEmpty(poolMap)) {
pools = new HashMap<>();
for (Map.Entry<String, String> entry : poolMap.entrySet()) {
pools.put(entry.getKey(), Integer.parseInt(entry.getValue()));
}
}

// Extract optional ports
int grpcPort = Integer.parseInt(simpleFields.getOrDefault(Helix.Instance.GRPC_PORT_KEY, "-1"));
int adminPort = Integer.parseInt(simpleFields.getOrDefault(Helix.Instance.ADMIN_PORT_KEY, "-1"));
int queryServicePort = Integer.parseInt(
simpleFields.getOrDefault(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_SERVICE_PORT_KEY, "-1"));
int queryMailboxPort = Integer.parseInt(
simpleFields.getOrDefault(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY, "-1"));
boolean queriesDisabled = Boolean.parseBoolean(simpleFields.getOrDefault(Helix.QUERIES_DISABLED, "false"));

return new Instance(host, port, type, tags, pools, grpcPort, adminPort, queryServicePort, queryMailboxPort,
queriesDisabled);
}

public static String getInstanceBaseUri(InstanceConfig instanceConfig) {
Map<String, String> fieldMap = instanceConfig.getRecord().getSimpleFields();
String hostName = instanceConfig.getHostName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,77 @@ public void testToHelixInstanceConfig() {
assertNull(znRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED));
}

@Test
public void testToInstance() {
// Controller — minimal, no optional fields
Instance controller = new Instance("localhost", 1234, InstanceType.CONTROLLER, null, null, 0, 0, 0, 0, false);
Instance rtController = InstanceUtils.toInstance(InstanceUtils.toHelixInstanceConfig(controller));
assertInstanceEquals(rtController, controller);

// Broker — with tags
List<String> brokerTags = Collections.singletonList("DefaultTenant_BROKER");
Instance broker = new Instance("localhost", 2345, InstanceType.BROKER, brokerTags, null, 0, 0, 0, 0, false);
Instance rtBroker = InstanceUtils.toInstance(InstanceUtils.toHelixInstanceConfig(broker));
assertInstanceEquals(rtBroker, broker);

// Server — with tags, pools, all optional ports, queriesDisabled
List<String> serverTags = Arrays.asList("T1_OFFLINE", "T2_REALTIME");
Map<String, Integer> poolMap = new TreeMap<>();
poolMap.put("T1_OFFLINE", 0);
poolMap.put("T2_REALTIME", 1);
Instance server = new Instance("localhost", 3456, InstanceType.SERVER, serverTags, poolMap, 123, 234, 345, 456,
true);
Instance rtServer = InstanceUtils.toInstance(InstanceUtils.toHelixInstanceConfig(server));
assertInstanceEquals(rtServer, server);

// Minion — with tags, no pools
List<String> minionTags = Collections.singletonList("minion_untagged");
Instance minion = new Instance("localhost", 4567, InstanceType.MINION, minionTags, null, 0, 0, 0, 0, false);
Instance rtMinion = InstanceUtils.toInstance(InstanceUtils.toHelixInstanceConfig(minion));
assertInstanceEquals(rtMinion, minion);

// Server with legacy 'Server_<hostname>' format in hostName field
InstanceConfig legacyServerConfig = new InstanceConfig("Server_localhost_3456");
legacyServerConfig.setHostName("Server_localhost");
legacyServerConfig.setPort("3456");
legacyServerConfig.addTag("T1_OFFLINE");
Instance legacyServer = InstanceUtils.toInstance(legacyServerConfig);
assertEquals(legacyServer.getHost(), "localhost");
assertEquals(legacyServer.getPort(), 3456);
assertEquals(legacyServer.getType(), InstanceType.SERVER);

// Unknown instance type prefix
InstanceConfig unknownConfig = new InstanceConfig("Unknown_localhost_1234");
unknownConfig.setHostName("localhost");
unknownConfig.setPort("1234");
try {
InstanceUtils.toInstance(unknownConfig);
throw new AssertionError("Expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("Unknown_localhost_1234"));
}
}

private void assertInstanceEquals(Instance actual, Instance expected) {
assertEquals(actual.getHost(), expected.getHost());
assertEquals(actual.getPort(), expected.getPort());
assertEquals(actual.getType(), expected.getType());
// toHelixInstanceConfig converts null tags to empty list
List<String> expectedTags = expected.getTags() != null ? expected.getTags() : Collections.emptyList();
assertEquals(actual.getTags(), expectedTags);
assertEquals(actual.getPools(), expected.getPools());
// toHelixInstanceConfig strips ports <= 0, so toInstance reads them back as -1
int expectedGrpc = expected.getGrpcPort() > 0 ? expected.getGrpcPort() : -1;
int expectedAdmin = expected.getAdminPort() > 0 ? expected.getAdminPort() : -1;
int expectedQueryService = expected.getQueryServicePort() > 0 ? expected.getQueryServicePort() : -1;
int expectedQueryMailbox = expected.getQueryMailboxPort() > 0 ? expected.getQueryMailboxPort() : -1;
assertEquals(actual.getGrpcPort(), expectedGrpc);
assertEquals(actual.getAdminPort(), expectedAdmin);
assertEquals(actual.getQueryServicePort(), expectedQueryService);
assertEquals(actual.getQueryMailboxPort(), expectedQueryMailbox);
assertEquals(actual.isQueriesDisabled(), expected.isQueriesDisabled());
}

@Test
public void testUpdateHelixInstanceConfig() {
Instance instance =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.exception.ConfigValidationException;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.JsonUtils;
Expand Down Expand Up @@ -246,6 +247,8 @@ public SuccessResponse addInstance(
return new SuccessResponse(response.getMessage());
} catch (ClientErrorException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), e.getResponse().getStatus());
} catch (ConfigValidationException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST, e);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Failed to create instance: " + instanceId,
Response.Status.INTERNAL_SERVER_ERROR, e);
Expand Down Expand Up @@ -416,6 +419,8 @@ public SuccessResponse updateInstance(
return new SuccessResponse(response.getMessage());
} catch (ClientErrorException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), e.getResponse().getStatus());
} catch (ConfigValidationException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST, e);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, "Failed to update instance: " + instanceName,
Response.Status.INTERNAL_SERVER_ERROR, e);
Expand Down Expand Up @@ -453,6 +458,8 @@ public SuccessResponse updateInstanceTags(
return new SuccessResponse(response.getMessage());
} catch (ClientErrorException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), e.getResponse().getStatus());
} catch (ConfigValidationException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST, e);
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER,
String.format("Failed to update instance: %s with tags: %s", instanceName, tags),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
import org.apache.pinot.spi.controller.ControllerJobType;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.exception.ConfigValidationException;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
Expand Down Expand Up @@ -360,6 +361,8 @@ public CopyTableResponse copyTable(

_pinotHelixResourceManager.addSchema(schema, true, false);
LOGGER.info("[copyTable] Successfully added schema for table: {}", tableName);
TableConfigValidationUtils.validateTableConfig(
realtimeTableConfig, schema, null, _pinotHelixResourceManager, _controllerConf, _pinotTaskManager);
Comment thread
suvodeep-pyne marked this conversation as resolved.
// Add the table with designated starting kafka offset and segment sequence number to create consuming segments
_pinotHelixResourceManager.addTable(realtimeTableConfig, streamMetadataList);
LOGGER.info("[copyTable] Successfully added table config: {} with designated high watermark", tableName);
Expand All @@ -374,6 +377,8 @@ public CopyTableResponse copyTable(
response.setWatermarkInductionResult(watermarkInductionResult);
}
return response;
} catch (ConfigValidationException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST, e);
} catch (Exception e) {
LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
throw new ControllerApplicationException(LOGGER, "Error copying table: " + e.getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pinot.controller.util.TaskConfigUtils;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableConfigValidatorRegistry;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
Expand Down Expand Up @@ -65,6 +66,7 @@ public static void validateTableConfig(TableConfig tableConfig, Schema schema,
validateInstanceAssignment(resourceManager, tableConfig);
resourceManager.validateTableTenantConfig(tableConfig);
resourceManager.validateTableTaskMinionInstanceTagConfig(tableConfig);
TableConfigValidatorRegistry.validate(tableConfig, schema);
}

private static void checkHybridTableConfig(PinotHelixResourceManager resourceManager, TableConfig tableConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.spi.config.TableConfigs;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableConfigValidatorRegistry;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
Expand Down Expand Up @@ -535,6 +536,7 @@ private void validateConfig(TableConfigs tableConfigs, String database, @Nullabl
}
}
TaskConfigUtils.validateTaskConfigs(tableConfigs.getOffline(), schema, _pinotTaskManager, typesToSkip);
TableConfigValidatorRegistry.validate(offlineTableConfig, schema);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validator still runs inside validateConfig(), but addConfig()/updateConfig() call tuneConfig() only afterwards. That means the SPI is evaluating a different TableConfig than the one that is actually persisted, unlike the /tables endpoints where tuning happens before validation. A validator can therefore reject or allow the same mutation depending on which API the caller uses. If this SPI is meant to protect the stored config, it needs to run after tuning or in PinotHelixResourceManager.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the ordering. I investigated what tuneConfig() actually modifies:

  • applyTunerConfigs() — only indexing config (inverted indices, no-dictionary columns)
  • ensureMinReplicas() — segment replication count
  • ensureStorageQuotaConstraints() — quota config for dimension tables

None of these touch instance assignment, replica groups, pools, or server tags — the fields that config validators (like batch restart enforcement) inspect. So the pre-tuned and post-tuned configs are identical for validation purposes.

I considered moving the SPI call out of validateConfig() and into each mutation path after tuning, but that creates a maintenance risk: any new mutation endpoint that calls validateConfig() would silently skip SPI validation unless the caller remembers to invoke it separately. Keeping it in validateConfig() makes it a single checkpoint that can't be accidentally bypassed.

If a future tuner modifies fields that validators care about, the right fix at that point would be to move tuning before validateConfig() in the /tableConfigs paths (matching what /tables already does), rather than pulling the SPI call out.

}
if (realtimeTableConfig != null) {
String realtimeRawTableName = DatabaseUtils.translateTableName(
Expand All @@ -555,6 +557,7 @@ private void validateConfig(TableConfigs tableConfigs, String database, @Nullabl
}
}
TaskConfigUtils.validateTaskConfigs(tableConfigs.getRealtime(), schema, _pinotTaskManager, typesToSkip);
TableConfigValidatorRegistry.validate(realtimeTableConfig, schema);
}
if (offlineTableConfig != null && realtimeTableConfig != null) {
TableConfigUtils.verifyHybridTableConfigs(rawTableName, offlineTableConfig, realtimeTableConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.instance.Instance;
import org.apache.pinot.spi.config.instance.InstanceConfigValidatorRegistry;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableStatsHumanReadable;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -555,6 +556,8 @@ public synchronized PinotResourceManagerResponse addInstance(Instance instance,
throw new ClientErrorException("Instance: " + instanceId + " already exists", Response.Status.CONFLICT);
}

InstanceConfigValidatorRegistry.validate(instance);

instanceConfig = InstanceUtils.toHelixInstanceConfig(instance);
_helixAdmin.addInstance(_helixClusterName, instanceConfig);

Expand Down Expand Up @@ -591,6 +594,8 @@ public synchronized PinotResourceManagerResponse updateInstance(String instanceI
throw new NotFoundException("Failed to find instance config for instance: " + instanceId);
}

InstanceConfigValidatorRegistry.validate(newInstance);

List<String> newTags = newInstance.getTags();
List<String> oldTags = instanceConfig.getTags();
InstanceUtils.updateHelixInstanceConfig(instanceConfig, newInstance);
Expand Down Expand Up @@ -635,7 +640,12 @@ public synchronized PinotResourceManagerResponse updateInstanceTags(String insta

List<String> newTags = Arrays.asList(StringUtils.split(tagsString, ','));
List<String> oldTags = instanceConfig.getTags();

// Apply new tags in-memory, validate, then persist. Safe: instanceConfig is a fresh local fetch,
// oldTags is captured above, and if validation throws the mutated config is never persisted.
instanceConfig.getRecord().setListField(InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), newTags);
InstanceConfigValidatorRegistry.validate(InstanceUtils.toInstance(instanceConfig));

if (!_helixDataAccessor.setProperty(_keyBuilder.instanceConfig(instanceId), instanceConfig)) {
throw new RuntimeException("Failed to set instance config for instance: " + instanceId);
}
Expand Down
Loading
Loading