From 10ee3172744a236cefaec8a43acc392f0f7f6b3c Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Tue, 12 May 2026 02:48:32 -0700 Subject: [PATCH] Add realtime consuming segment index profile --- .../pinot/common/metrics/ServerMeter.java | 3 + .../utils/config/TableConfigSerDeUtils.java | 13 +- .../config/TableConfigSerDeUtilsTest.java | 43 +++ .../realtime/RealtimeSegmentDataManager.java | 10 +- ...sumingSegmentIndexProfileRealtimeTest.java | 360 ++++++++++++++++++ ...CustomDataQueryClusterIntegrationTest.java | 6 + .../StatelessRealtimeSegmentWriter.java | 8 +- .../segment/local/utils/TableConfigUtils.java | 324 ++++++++++++++++ ...ConfigConsumingSegmentIndexConfigTest.java | 342 +++++++++++++++++ .../table/ConsumingSegmentFieldConfig.java | 87 +++++ .../table/ConsumingSegmentIndexConfig.java | 64 ++++ .../spi/config/table/RealtimeConfig.java | 64 ++++ .../pinot/spi/config/table/TableConfig.java | 49 ++- .../spi/utils/builder/TableConfigBuilder.java | 10 +- .../consumingSegmentIndexProfile/README.md | 77 ++++ .../userEvents_realtime_table_config.json | 51 +++ 16 files changed, 1504 insertions(+), 7 deletions(-) create mode 100644 pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingSegmentIndexProfileRealtimeTest.java create mode 100644 pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingSegmentIndexConfigTest.java create mode 100644 pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ConsumingSegmentFieldConfig.java create mode 100644 pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ConsumingSegmentIndexConfig.java create mode 100644 pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RealtimeConfig.java create mode 100644 pinot-tools/src/main/resources/examples/stream/consumingSegmentIndexProfile/README.md create mode 100644 pinot-tools/src/main/resources/examples/stream/consumingSegmentIndexProfile/userEvents_realtime_table_config.json diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 49ea60bb56cd..c6f2ab2c6fc8 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -102,6 +102,9 @@ public enum ServerMeter implements AbstractMetrics.Meter { SEGMENT_DOWNLOAD_FROM_REMOTE_FAILURES("segments", false), SEGMENT_DOWNLOAD_FROM_PEERS_FAILURES("segments", false), SEGMENT_BUILD_FAILURE("segments", false), + /// Counts consuming-segment-build events where `realtimeConfig.consumingSegmentIndexConfig` could not be applied + /// and the server fell back to the persisted segment shape. Emitted per table. + CONSUMING_SEGMENT_INDEX_PROFILE_FALLBACK("events", false), SEGMENT_UPLOAD_FAILURE("segments", false), SEGMENT_UPLOAD_SUCCESS("segments", false), // Emitted only by Server to Deep-store segment uploader. diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java index a24583891fd7..87153f085ab2 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtils.java @@ -33,6 +33,7 @@ import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.QuotaConfig; +import org.apache.pinot.spi.config.table.RealtimeConfig; import org.apache.pinot.spi.config.table.RoutingConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; @@ -112,6 +113,12 @@ public static TableConfig fromZNRecord(ZNRecord znRecord) queryConfig = JsonUtils.stringToObject(queryConfigString, QueryConfig.class); } + RealtimeConfig realtimeConfig = null; + String realtimeConfigString = simpleFields.get(TableConfig.REALTIME_CONFIG_KEY); + if (realtimeConfigString != null) { + realtimeConfig = JsonUtils.stringToObject(realtimeConfigString, RealtimeConfig.class); + } + Map instanceAssignmentConfigMap = null; String instanceAssignmentConfigMapString = simpleFields.get(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY); if (instanceAssignmentConfigMapString != null) { @@ -198,7 +205,7 @@ public static TableConfig fromZNRecord(ZNRecord znRecord) new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig, quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, upsertConfig, dedupConfig, dimensionTableConfig, ingestionConfig, tierConfigList, isDimTable, - tunerConfigList, instancePartitionsMap, segmentAssignmentConfigMap, tableSamplerConfigs); + tunerConfigList, instancePartitionsMap, segmentAssignmentConfigMap, tableSamplerConfigs, realtimeConfig); tableConfig.setDescription(description); tableConfig.setTags(tags); return tableConfig; @@ -234,6 +241,10 @@ public static ZNRecord toZNRecord(TableConfig tableConfig) if (queryConfig != null) { simpleFields.put(TableConfig.QUERY_CONFIG_KEY, queryConfig.toJsonString()); } + RealtimeConfig realtimeConfig = tableConfig.getRealtimeConfig(); + if (realtimeConfig != null) { + simpleFields.put(TableConfig.REALTIME_CONFIG_KEY, realtimeConfig.toJsonString()); + } Map instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap(); if (instanceAssignmentConfigMap != null) { simpleFields.put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY, diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java index 796f26b02db3..7d655f2ea8f8 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java @@ -28,11 +28,14 @@ import java.util.Map; import org.apache.pinot.common.tier.TierFactory; import org.apache.pinot.spi.config.table.CompletionConfig; +import org.apache.pinot.spi.config.table.ConsumingSegmentFieldConfig; +import org.apache.pinot.spi.config.table.ConsumingSegmentIndexConfig; import org.apache.pinot.spi.config.table.DedupConfig; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.QuotaConfig; +import org.apache.pinot.spi.config.table.RealtimeConfig; import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.RoutingConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; @@ -214,6 +217,28 @@ public void testSerDe() assertEquals(tableConfigToCompare, tableConfig); checkQueryConfig(tableConfigToCompare); } + { + // With realtime config + ObjectNode indexes = JsonUtils.newObjectNode(); + indexes.set("inverted", JsonUtils.newObjectNode().put("enabled", true)); + RealtimeConfig realtimeConfig = new RealtimeConfig(new ConsumingSegmentIndexConfig(List.of( + new ConsumingSegmentFieldConfig("column1", FieldConfig.EncodingType.DICTIONARY, indexes)))); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME) + .setTableName("testRealtimeTable") + .setRealtimeConfig(realtimeConfig) + .build(); + + checkRealtimeConfig(tableConfig); + + // Serialize then de-serialize + TableConfig tableConfigToCompare = JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class); + assertEquals(tableConfigToCompare, tableConfig); + checkRealtimeConfig(tableConfigToCompare); + + tableConfigToCompare = TableConfigSerDeUtils.fromZNRecord(TableConfigSerDeUtils.toZNRecord(tableConfig)); + assertEquals(tableConfigToCompare, tableConfig); + checkRealtimeConfig(tableConfigToCompare); + } { // With instance assignment config InstanceAssignmentConfig instanceAssignmentConfig = @@ -410,6 +435,7 @@ private void checkDefaultTableConfig(TableConfig tableConfig) { assertFalse(tableConfigJson.has(TableConfig.TASK_CONFIG_KEY)); assertFalse(tableConfigJson.has(TableConfig.ROUTING_CONFIG_KEY)); assertFalse(tableConfigJson.has(TableConfig.QUERY_CONFIG_KEY)); + assertFalse(tableConfigJson.has(TableConfig.REALTIME_CONFIG_KEY)); assertFalse(tableConfigJson.has(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY)); assertFalse(tableConfigJson.has(TableConfig.SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY)); assertFalse(tableConfigJson.has(TableConfig.FIELD_CONFIG_LIST_KEY)); @@ -430,6 +456,23 @@ private void checkTenantConfigWithoutTagOverride(TableConfig tableConfig) { assertNull(tenantConfig.getTagOverrideConfig()); } + private void checkRealtimeConfig(TableConfig tableConfig) { + RealtimeConfig realtimeConfig = tableConfig.getRealtimeConfig(); + assertNotNull(realtimeConfig); + ConsumingSegmentIndexConfig consumingSegmentIndexConfig = realtimeConfig.getConsumingSegmentIndexConfig(); + assertNotNull(consumingSegmentIndexConfig); + List fieldConfigList = consumingSegmentIndexConfig.getFieldConfigList(); + assertNotNull(fieldConfigList); + assertEquals(fieldConfigList.size(), 1); + ConsumingSegmentFieldConfig fieldConfig = fieldConfigList.get(0); + assertEquals(fieldConfig.getName(), "column1"); + assertEquals(fieldConfig.getEncodingType(), FieldConfig.EncodingType.DICTIONARY); + assertTrue(fieldConfig.getIndexes().has("inverted")); + + ObjectNode tableConfigJson = (ObjectNode) tableConfig.toJsonNode(); + assertTrue(tableConfigJson.has(TableConfig.REALTIME_CONFIG_KEY)); + } + private void checkTenantConfigWithTagOverride(TableConfig tableConfig) { TenantConfig tenantConfig = tableConfig.getTenantConfig(); assertNotNull(tenantConfig); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index fa423c7d081c..de9cd51cc84e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -69,6 +69,7 @@ import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; import org.apache.pinot.segment.local.upsert.UpsertContext; import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.segment.spi.MutableSegment; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.creator.SegmentVersion; @@ -1821,9 +1822,14 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf _isOffHeap = indexLoadingConfig.isRealtimeOffHeapAllocation(); _defaultNullHandlingEnabled = indexingConfig.isNullHandlingEnabled(); - // Start new realtime segment + // Start new realtime segment. The dispatch helper keeps the existing IndexLoadingConfig-driven path when no + // consuming segment index profile is configured, and builds a mutable-only profile view when one is configured. String consumerDir = realtimeTableDataManager.getConsumerDir(); - RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig) + RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = + TableConfigUtils.buildConsumingSegmentConfigBuilder(_tableConfig, _schema, indexLoadingConfig, _segmentLogger, + () -> _serverMetrics.addMeteredTableValue(_tableNameWithType, + ServerMeter.CONSUMING_SEGMENT_INDEX_PROFILE_FALLBACK, 1L)); + realtimeSegmentConfigBuilder .setTableNameWithType(_tableNameWithType) .setSegmentName(_segmentNameStr) .setStreamName(streamTopic) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingSegmentIndexProfileRealtimeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingSegmentIndexProfileRealtimeTest.java new file mode 100644 index 000000000000..c6024101b171 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/ConsumingSegmentIndexProfileRealtimeTest.java @@ -0,0 +1,360 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests.custom; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.File; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.helix.model.IdealState; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; +import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; +import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.server.starter.helix.BaseServerStarter; +import org.apache.pinot.spi.config.table.ConsumingSegmentFieldConfig; +import org.apache.pinot.spi.config.table.ConsumingSegmentIndexConfig; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.RealtimeConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/// End-to-end coverage for `realtimeConfig.consumingSegmentIndexConfig` on a realtime table. +/// +/// The table persists `profiledString` as RAW with no inverted index. The consuming segment profile upgrades that +/// column to DICTIONARY + INVERTED only while the segment is mutable and consuming. After force-commit, the immutable +/// segment must retain the persisted RAW/no-inverted shape on disk. +@Test(suiteName = "CustomClusterIntegrationTest") +public class ConsumingSegmentIndexProfileRealtimeTest extends CustomDataQueryClusterIntegrationTest { + private static final String TABLE_NAME = "ConsumingSegmentIndexProfileRealtimeTest"; + private static final int NUM_DOCS = 200; + private static final String PROFILED_STRING_COLUMN = "profiledString"; + private static final String CONTROL_STRING_COLUMN = "controlString"; + private static final String INT_COLUMN = "intCol"; + private static final String TIME_COLUMN = "tsMillis"; + private static final String[] STRING_VALUES = {"alpha", "beta", "gamma", "delta"}; + + @Override + public String getTableName() { + return TABLE_NAME; + } + + @Override + public boolean isRealtimeTable() { + return true; + } + + @Override + protected long getCountStarResult() { + return NUM_DOCS; + } + + @Override + protected int getRealtimeSegmentFlushSize() { + return NUM_DOCS * 10; + } + + @Override + public Schema createSchema() { + return new Schema.SchemaBuilder().setSchemaName(getTableName()) + .addSingleValueDimension(PROFILED_STRING_COLUMN, FieldSpec.DataType.STRING) + .addSingleValueDimension(CONTROL_STRING_COLUMN, FieldSpec.DataType.STRING) + .addMetric(INT_COLUMN, FieldSpec.DataType.INT) + .addDateTimeField(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS") + .build(); + } + + @Override + public String getTimeColumnName() { + return TIME_COLUMN; + } + + @Override + protected String getSortedColumn() { + return null; + } + + @Override + protected List getNoDictionaryColumns() { + return List.of(PROFILED_STRING_COLUMN); + } + + @Override + protected List getInvertedIndexColumns() { + return List.of(); + } + + @Override + protected List getFieldConfigs() { + return List.of(new FieldConfig.Builder(PROFILED_STRING_COLUMN) + .withEncodingType(FieldConfig.EncodingType.RAW) + .build()); + } + + @Override + protected TableConfig createRealtimeTableConfig(File sampleAvroFile) { + AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile; + ObjectNode indexes = JsonUtils.newObjectNode(); + indexes.set("inverted", JsonUtils.newObjectNode().put("enabled", true)); + RealtimeConfig realtimeConfig = new RealtimeConfig(new ConsumingSegmentIndexConfig(List.of( + new ConsumingSegmentFieldConfig(PROFILED_STRING_COLUMN, FieldConfig.EncodingType.DICTIONARY, indexes)))); + return getTableConfigBuilder(TableType.REALTIME) + .setRealtimeConfig(realtimeConfig) + .build(); + } + + @Override + public List createAvroFiles() + throws Exception { + org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false); + org.apache.avro.Schema stringSchema = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING); + org.apache.avro.Schema intSchema = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT); + org.apache.avro.Schema longSchema = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG); + avroSchema.setFields(List.of( + new org.apache.avro.Schema.Field(PROFILED_STRING_COLUMN, stringSchema, null, null), + new org.apache.avro.Schema.Field(CONTROL_STRING_COLUMN, stringSchema, null, null), + new org.apache.avro.Schema.Field(INT_COLUMN, intSchema, null, null), + new org.apache.avro.Schema.Field(TIME_COLUMN, longSchema, null, null))); + + long now = System.currentTimeMillis(); + try (AvroFilesAndWriters avroFilesAndWriters = createAvroFilesAndWriters(avroSchema)) { + List> writers = avroFilesAndWriters.getWriters(); + for (int i = 0; i < NUM_DOCS; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put(PROFILED_STRING_COLUMN, STRING_VALUES[i % STRING_VALUES.length]); + record.put(CONTROL_STRING_COLUMN, "fixed"); + record.put(INT_COLUMN, i); + record.put(TIME_COLUMN, now + i); + writers.get(i % getNumAvroFiles()).append(record); + } + return avroFilesAndWriters.getAvroFiles(); + } + } + + @Test + public void testConsumingSegmentIndexProfileEndToEnd() + throws Exception { + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName()); + waitForAllDocsLoaded(60_000L); + + long alphaCountBeforeCommit = queryStringCount(); + assertEquals(alphaCountBeforeCommit, (NUM_DOCS + STRING_VALUES.length - 1) / STRING_VALUES.length); + + int consumingSegmentsInspected = inspectConsumingSegments(realtimeTableName); + assertTrue(consumingSegmentsInspected > 0, + "Expected at least one consuming segment before forceCommit, got " + consumingSegmentsInspected); + + forceCommitAndWait(realtimeTableName); + long alphaCountAfterCommit = queryStringCount(); + assertEquals(alphaCountAfterCommit, alphaCountBeforeCommit, + "Consuming profile must not change query semantics after commit"); + + int immutableSegmentsInspected = inspectImmutableSegments(realtimeTableName); + assertTrue(immutableSegmentsInspected > 0, + "Expected at least one immutable segment after forceCommit, got " + immutableSegmentsInspected); + } + + private int inspectConsumingSegments(String realtimeTableName) { + int inspected = 0; + for (BaseServerStarter serverStarter : getSharedServerStarters()) { + TableDataManager tableDataManager = serverStarter.getServerInstance().getInstanceDataManager() + .getTableDataManager(realtimeTableName); + assertNotNull(tableDataManager, "Missing table data manager on server"); + List segmentDataManagers = tableDataManager.acquireAllSegments(); + try { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + IndexSegment segment = segmentDataManager.getSegment(); + if (!(segment instanceof MutableSegmentImpl)) { + continue; + } + DataSource profiled = segment.getDataSource(PROFILED_STRING_COLUMN); + assertNotNull(profiled.getDictionary(), + PROFILED_STRING_COLUMN + " must have a consuming-segment dictionary"); + assertNotNull(profiled.getInvertedIndex(), + PROFILED_STRING_COLUMN + " must have a consuming-segment inverted index"); + + DataSource control = segment.getDataSource(CONTROL_STRING_COLUMN); + assertNotNull(control.getDictionary(), CONTROL_STRING_COLUMN + " should keep default dictionary encoding"); + inspected++; + } + } finally { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + tableDataManager.releaseSegment(segmentDataManager); + } + } + } + return inspected; + } + + private int inspectImmutableSegments(String realtimeTableName) + throws Exception { + int inspected = 0; + for (BaseServerStarter serverStarter : getSharedServerStarters()) { + TableDataManager tableDataManager = serverStarter.getServerInstance().getInstanceDataManager() + .getTableDataManager(realtimeTableName); + assertNotNull(tableDataManager, "Missing table data manager on server"); + List segmentDataManagers = tableDataManager.acquireAllSegments(); + try { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + IndexSegment segment = segmentDataManager.getSegment(); + if (!(segment instanceof ImmutableSegmentImpl)) { + continue; + } + SegmentMetadata metadata = segment.getSegmentMetadata(); + ColumnMetadata profiledMetadata = metadata.getColumnMetadataFor(PROFILED_STRING_COLUMN); + assertNotNull(profiledMetadata, "Missing metadata for " + PROFILED_STRING_COLUMN); + assertFalse(profiledMetadata.hasDictionary(), + PROFILED_STRING_COLUMN + " must remain RAW/no-dictionary after commit"); + assertEquals(profiledMetadata.getForwardIndexEncoding(), FieldConfig.EncodingType.RAW, + PROFILED_STRING_COLUMN + " must persist with RAW forward-index encoding after commit"); + + ColumnMetadata controlMetadata = metadata.getColumnMetadataFor(CONTROL_STRING_COLUMN); + assertNotNull(controlMetadata, "Missing metadata for " + CONTROL_STRING_COLUMN); + assertTrue(controlMetadata.hasDictionary(), CONTROL_STRING_COLUMN + " must keep default dictionary encoding"); + + try (SegmentDirectory directory = new SegmentLocalFSDirectory(metadata.getIndexDir(), ReadMode.mmap); + SegmentDirectory.Reader reader = directory.createReader()) { + assertTrue(reader.hasIndexFor(PROFILED_STRING_COLUMN, StandardIndexes.forward()), + PROFILED_STRING_COLUMN + " RAW forward index must be persisted"); + assertFalse(reader.hasIndexFor(PROFILED_STRING_COLUMN, StandardIndexes.dictionary()), + PROFILED_STRING_COLUMN + " dictionary must not be persisted"); + assertFalse(reader.hasIndexFor(PROFILED_STRING_COLUMN, StandardIndexes.inverted()), + PROFILED_STRING_COLUMN + " inverted index must not be persisted"); + } + inspected++; + } + } finally { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + tableDataManager.releaseSegment(segmentDataManager); + } + } + } + return inspected; + } + + private void forceCommitAndWait(String realtimeTableName) + throws Exception { + Set consumingSegments = getConsumingSegments(realtimeTableName); + assertFalse(consumingSegments.isEmpty(), "Expected consuming segments before forceCommit"); + String response = getOrCreateAdminClient().getTableClient().forceCommit(realtimeTableName); + String jobId = JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText(); + + TestUtils.waitForCondition(aVoid -> { + try { + String jobStatusResponse = getOrCreateAdminClient().getTableClient().getForceCommitJobStatus(jobId); + JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse); + return jobStatus.get(CommonConstants.ControllerJob.NUM_CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED).asInt(-1) == 0; + } catch (Exception e) { + throw new RuntimeException(e); + } + }, 120_000L, "Timed out waiting for forceCommit job: " + jobId); + + TestUtils.waitForCondition(aVoid -> { + for (String segmentName : consumingSegments) { + SegmentZKMetadata metadata = getSharedHelixResourceManager().getSegmentZKMetadata(realtimeTableName, + segmentName); + if (metadata == null || metadata.getStatus() != CommonConstants.Segment.Realtime.Status.DONE) { + return false; + } + } + return true; + }, 120_000L, "Timed out waiting for force-committed segments to become DONE"); + + TestUtils.waitForCondition(aVoid -> areSegmentsImmutableOnServers(realtimeTableName, consumingSegments), + 120_000L, "Timed out waiting for force-committed segments to load as immutable on servers"); + } + + private boolean areSegmentsImmutableOnServers(String realtimeTableName, Set segmentNames) { + Set immutableSegments = new HashSet<>(); + for (BaseServerStarter serverStarter : getSharedServerStarters()) { + TableDataManager tableDataManager = serverStarter.getServerInstance().getInstanceDataManager() + .getTableDataManager(realtimeTableName); + if (tableDataManager == null) { + continue; + } + List segmentDataManagers = tableDataManager.acquireAllSegments(); + try { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + IndexSegment segment = segmentDataManager.getSegment(); + if (segment instanceof ImmutableSegmentImpl && segmentNames.contains(segment.getSegmentName())) { + immutableSegments.add(segment.getSegmentName()); + } + } + } finally { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + tableDataManager.releaseSegment(segmentDataManager); + } + } + } + return immutableSegments.containsAll(segmentNames); + } + + private Set getConsumingSegments(String realtimeTableName) { + IdealState idealState = getSharedHelixResourceManager().getTableIdealState(realtimeTableName); + Set consumingSegments = new HashSet<>(); + if (idealState == null) { + return consumingSegments; + } + for (String segmentName : idealState.getPartitionSet()) { + if (!LLCSegmentName.isLLCSegment(segmentName)) { + continue; + } + Map stateMap = idealState.getInstanceStateMap(segmentName); + if (stateMap != null && stateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)) { + consumingSegments.add(segmentName); + } + } + return consumingSegments; + } + + private long queryStringCount() { + return Long.parseLong(getPinotConnection() + .execute("SELECT COUNT(*) FROM " + getTableName() + " WHERE " + PROFILED_STRING_COLUMN + " = '" + + STRING_VALUES[0] + "'") + .getResultSet(0).getString(0)); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java index 865e87b3bb95..5a09cb3f9ffa 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java @@ -46,6 +46,7 @@ import org.apache.pinot.integration.tests.BaseClusterIntegrationTest; import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils; import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties; +import org.apache.pinot.server.starter.helix.BaseServerStarter; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; @@ -305,6 +306,11 @@ protected BaseControllerStarter getSharedControllerStarter() { return _sharedClusterTestSuite._controllerStarter; } + /// Returns server starters from the shared suite instance. + protected List getSharedServerStarters() { + return _sharedClusterTestSuite._serverStarters; + } + /** * Returns the property store from the shared suite instance. */ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java index c7f623b6c90f..cfb937e27eb8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java @@ -43,6 +43,7 @@ import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.utils.IngestionUtils; +import org.apache.pinot.segment.local.utils.TableConfigUtils; import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; @@ -180,9 +181,12 @@ public StatelessRealtimeSegmentWriter(SegmentZKMetadata segmentZKMetadata, Index File statsHistoryFile = new File(tableDataDir, SEGMENT_STATS_FILE_NAME); RealtimeSegmentStatsHistory statsHistory = RealtimeSegmentStatsHistory.deserializeFrom(statsHistoryFile); - // Initialize mutable segment with configurations + // Initialize mutable segment with configurations. StatelessRealtimeSegmentWriter is a one-shot tool, so the + // fallback path relies on the error log instead of emitting a server metric. IngestionConfig ingestionConfig = _tableConfig.getIngestionConfig(); - RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig) + RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = + TableConfigUtils.buildConsumingSegmentConfigBuilder(_tableConfig, _schema, indexLoadingConfig, _logger, null); + realtimeSegmentConfigBuilder .setTableNameWithType(_tableNameWithType) .setSegmentName(_segmentName) .setStreamName(_streamConfig.getTopicName()) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 31bea7e4a708..c495dda10876 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.utils; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -48,7 +49,9 @@ import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.segment.local.aggregator.ValueAggregator; import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.index.DictionaryIndexConfig; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; @@ -59,6 +62,8 @@ import org.apache.pinot.segment.spi.index.multicolumntext.MultiColumnTextMetadata; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.ConsumingSegmentFieldConfig; +import org.apache.pinot.spi.config.table.ConsumingSegmentIndexConfig; import org.apache.pinot.spi.config.table.DedupConfig; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.FieldConfig.EncodingType; @@ -66,6 +71,7 @@ import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.MultiColumnTextIndexConfig; import org.apache.pinot.spi.config.table.QuotaConfig; +import org.apache.pinot.spi.config.table.RealtimeConfig; import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.RoutingConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; @@ -187,6 +193,7 @@ public static void validate(TableConfig tableConfig, Schema schema, @Nullable St } validateTierConfigList(tableConfig.getTierConfigsList()); validateIndexingConfigAndFieldConfigList(tableConfig, schema); + validateConsumingSegmentIndexConfig(tableConfig, schema); validateInstancePartitionsTypeMapConfig(tableConfig); validatePartitionedReplicaGroupInstance(tableConfig); validateInstanceAssignmentConfigs(tableConfig); @@ -2126,6 +2133,323 @@ private static void overwriteConfig(JsonNode oldCfg, JsonNode newCfg) { } } + // Top-level legacy tableIndexConfig lists that conflict with the first supported consuming profile index. + private static final Set CONSUMING_SEGMENT_PROFILE_TOP_LEVEL_INDEX_LIST_KEYS = Set.of("invertedIndexColumns"); + private static final String NO_DICTIONARY_CONFIG_KEY = "noDictionaryConfig"; + private static final String NO_DICTIONARY_COLUMNS_KEY = "noDictionaryColumns"; + private static final String INVERTED_INDEX_KEY = "inverted"; + private static final Set SUPPORTED_CONSUMING_SEGMENT_PROFILE_INDEXES = Set.of(INVERTED_INDEX_KEY); + + /// Returns `true` when the table has a consuming segment index profile. + public static boolean hasConsumingSegmentIndexConfig(TableConfig tableConfig) { + return getConsumingSegmentIndexConfig(tableConfig) != null; + } + + @Nullable + private static ConsumingSegmentIndexConfig getConsumingSegmentIndexConfig(TableConfig tableConfig) { + RealtimeConfig realtimeConfig = tableConfig.getRealtimeConfig(); + return realtimeConfig == null ? null : realtimeConfig.getConsumingSegmentIndexConfig(); + } + + /// Returns a [TableConfig] with `realtimeConfig.consumingSegmentIndexConfig` applied to the base table config. + /// The returned config is intended only for building realtime mutable consuming segments. Committed segments and + /// immutable loads must continue to use the persisted table config, plus normal storage-tier overwrites if a real + /// segment tier exists. + /// + /// Merge semantics are intentionally narrow: + /// - `encodingType` replaces the field config encoding when present. + /// - `indexes` replaces the field config `indexes` tree when present; only `inverted` is supported in this first + /// version. + /// - legacy `invertedIndexColumns` and no-dictionary config entries are scrubbed only when they contradict the + /// specific consuming profile override. + /// + /// @param tableConfig original table config; not mutated + /// @return a new table config with the consuming profile applied, or the original config when no profile exists + public static TableConfig applyConsumingSegmentIndexConfig(TableConfig tableConfig) { + return applyConsumingSegmentIndexConfig(tableConfig, null); + } + + private static TableConfig applyConsumingSegmentIndexConfig(TableConfig tableConfig, @Nullable Schema schema) { + ConsumingSegmentIndexConfig consumingSegmentIndexConfig = getConsumingSegmentIndexConfig(tableConfig); + if (consumingSegmentIndexConfig == null) { + return tableConfig; + } + enforceConsumingSegmentIndexConfigInvariants(tableConfig, schema); + Set indexOverrideColumns = new HashSet<>(); + Set dictionaryOverrideColumns = new HashSet<>(); + for (ConsumingSegmentFieldConfig profileFieldConfig : consumingSegmentIndexConfig.getFieldConfigList()) { + String column = profileFieldConfig.getName(); + if (profileFieldConfig.hasConfiguredIndexes()) { + indexOverrideColumns.add(column); + } + if (profileFieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY) { + dictionaryOverrideColumns.add(column); + } + } + try { + JsonNode tblCfgJson = tableConfig.toJsonNode().deepCopy(); + Preconditions.checkState(tblCfgJson.isObject(), "Table config JSON must be an object for table: %s", + tableConfig.getTableName()); + ObjectNode tblCfgObj = (ObjectNode) tblCfgJson; + ArrayNode fieldCfgListJson; + JsonNode fieldCfgListNode = tblCfgObj.get(TableConfig.FIELD_CONFIG_LIST_KEY); + if (fieldCfgListNode == null || fieldCfgListNode.isNull()) { + fieldCfgListJson = JsonUtils.newArrayNode(); + tblCfgObj.set(TableConfig.FIELD_CONFIG_LIST_KEY, fieldCfgListJson); + } else { + Preconditions.checkState(fieldCfgListNode.isArray(), + "fieldConfigList must be an array in table config JSON for table: %s", tableConfig.getTableName()); + fieldCfgListJson = (ArrayNode) fieldCfgListNode; + } + + Map fieldConfigByColumn = new java.util.LinkedHashMap<>(); + Iterator fieldCfgItr = fieldCfgListJson.elements(); + while (fieldCfgItr.hasNext()) { + JsonNode fieldCfgJson = fieldCfgItr.next(); + if (!fieldCfgJson.isObject()) { + continue; + } + JsonNode nameNode = fieldCfgJson.get(ConsumingSegmentFieldConfig.NAME_KEY); + if (nameNode != null && nameNode.isTextual()) { + fieldConfigByColumn.put(nameNode.asText(), (ObjectNode) fieldCfgJson); + } + } + + for (ConsumingSegmentFieldConfig profileFieldConfig : consumingSegmentIndexConfig.getFieldConfigList()) { + String column = profileFieldConfig.getName(); + ObjectNode fieldCfgObj = fieldConfigByColumn.get(column); + if (fieldCfgObj == null) { + fieldCfgObj = JsonUtils.newObjectNode(); + fieldCfgObj.put(ConsumingSegmentFieldConfig.NAME_KEY, column); + fieldCfgListJson.add(fieldCfgObj); + fieldConfigByColumn.put(column, fieldCfgObj); + } + FieldConfig.EncodingType encodingType = profileFieldConfig.getEncodingType(); + if (encodingType != null) { + fieldCfgObj.put(ConsumingSegmentFieldConfig.ENCODING_TYPE_KEY, encodingType.name()); + } + if (profileFieldConfig.hasConfiguredIndexes()) { + fieldCfgObj.set(ConsumingSegmentFieldConfig.INDEXES_KEY, profileFieldConfig.getIndexes().deepCopy()); + fieldCfgObj.remove("indexType"); + fieldCfgObj.remove("indexTypes"); + } + } + + JsonNode tblIdxCfgJson = tblCfgObj.get(TableConfig.INDEXING_CONFIG_KEY); + if (tblIdxCfgJson != null && tblIdxCfgJson.isObject()) { + scrubConsumingSegmentOverridesFromTableIndexConfig((ObjectNode) tblIdxCfgJson, indexOverrideColumns, + dictionaryOverrideColumns); + } + tblCfgObj.remove(TableConfig.REALTIME_CONFIG_KEY); + return JsonUtils.jsonNodeToObject(tblCfgObj, TableConfig.class); + } catch (IOException e) { + throw new IllegalStateException( + "Failed to apply consuming segment index config for table: " + tableConfig.getTableName(), e); + } + } + + private static void scrubConsumingSegmentOverridesFromTableIndexConfig(ObjectNode tblIdxCfgObj, + Set indexOverrideColumns, Set dictionaryOverrideColumns) { + Iterator> entries = tblIdxCfgObj.properties().iterator(); + while (entries.hasNext()) { + Map.Entry entry = entries.next(); + String key = entry.getKey(); + JsonNode value = entry.getValue(); + if (NO_DICTIONARY_CONFIG_KEY.equals(key) && value != null && value.isObject()) { + for (String dictionaryOverrideColumn : dictionaryOverrideColumns) { + ((ObjectNode) value).remove(dictionaryOverrideColumn); + } + continue; + } + Set columnsToScrub; + if (NO_DICTIONARY_COLUMNS_KEY.equals(key)) { + columnsToScrub = dictionaryOverrideColumns; + } else if (CONSUMING_SEGMENT_PROFILE_TOP_LEVEL_INDEX_LIST_KEYS.contains(key)) { + columnsToScrub = indexOverrideColumns; + } else { + continue; + } + if (columnsToScrub.isEmpty()) { + continue; + } + if (value == null || !value.isArray() || value.isEmpty()) { + continue; + } + ArrayNode arr = (ArrayNode) value; + for (JsonNode element : arr) { + if (!element.isTextual()) { + arr = null; + break; + } + } + if (arr == null) { + continue; + } + List kept = new ArrayList<>(arr.size()); + for (JsonNode element : arr) { + if (!columnsToScrub.contains(element.asText())) { + kept.add(element); + } + } + if (kept.size() != arr.size()) { + ArrayNode replacement = JsonUtils.newArrayNode(); + for (JsonNode element : kept) { + replacement.add(element); + } + tblIdxCfgObj.set(key, replacement); + } + } + } + + /// Builds the [RealtimeSegmentConfig.Builder] for a mutable consuming segment, applying + /// `realtimeConfig.consumingSegmentIndexConfig` when present. Storage-tier overwrites are intentionally not applied + /// on the profile path: tier overwrites describe immutable segment storage placement/load behavior, while this + /// profile describes only the mutable consuming lifecycle. + public static RealtimeSegmentConfig.Builder buildConsumingSegmentConfigBuilder(TableConfig tableConfig, + Schema schema, IndexLoadingConfig indexLoadingConfig, Logger logger, @Nullable Runnable onFallback) { + if (hasConsumingSegmentIndexConfig(tableConfig)) { + try { + TableConfig consumingTableConfig = applyConsumingSegmentIndexConfig(tableConfig, schema); + Schema schemaCopy; + try { + schemaCopy = JsonUtils.jsonNodeToObject(schema.toJsonObject(), Schema.class); + } catch (IOException e) { + throw new IllegalStateException( + "Failed to clone schema for consuming segment index config on table: " + tableConfig.getTableName(), e); + } + IndexLoadingConfig consumingIndexLoadingConfig = + new IndexLoadingConfig(indexLoadingConfig.getInstanceDataManagerConfig(), consumingTableConfig, schemaCopy); + return new RealtimeSegmentConfig.Builder(consumingIndexLoadingConfig); + } catch (RuntimeException e) { + logger.error("Failed to apply consuming segment index config for table: {} (profile: {}); falling back to " + + "persisted shape for this consuming segment", tableConfig.getTableName(), + summarizeConsumingSegmentIndexConfig(tableConfig), e); + if (onFallback != null) { + onFallback.run(); + } + } + } + return new RealtimeSegmentConfig.Builder(indexLoadingConfig); + } + + private static String summarizeConsumingSegmentIndexConfig(TableConfig tableConfig) { + ConsumingSegmentIndexConfig config = getConsumingSegmentIndexConfig(tableConfig); + return config == null ? "{}" : config.toJsonString(); + } + + private static void validateConsumingSegmentIndexConfig(TableConfig tableConfig, Schema schema) { + RealtimeConfig realtimeConfig = tableConfig.getRealtimeConfig(); + if (realtimeConfig == null) { + return; + } + Preconditions.checkArgument(realtimeConfig.getUnknownProperties().isEmpty(), + "Unknown realtimeConfig keys: %s; allowed keys: [%s]", realtimeConfig.getUnknownProperties().keySet(), + RealtimeConfig.CONSUMING_SEGMENT_INDEX_CONFIG_KEY); + ConsumingSegmentIndexConfig consumingSegmentIndexConfig = realtimeConfig.getConsumingSegmentIndexConfig(); + if (consumingSegmentIndexConfig == null) { + return; + } + Set profiledColumns = enforceConsumingSegmentIndexConfigInvariants(tableConfig, schema); + TableConfig merged = applyConsumingSegmentIndexConfig(tableConfig, schema); + if (merged != tableConfig) { + Schema schemaCopy; + try { + schemaCopy = JsonUtils.jsonNodeToObject(schema.toJsonObject(), Schema.class); + } catch (IOException e) { + throw new IllegalStateException("Failed to clone schema while validating consuming segment index config for " + + "table: " + tableConfig.getTableName(), e); + } + try { + validateIndexingConfigAndFieldConfigList(merged, schemaCopy); + } catch (RuntimeException e) { + throw new IllegalStateException( + "realtimeConfig.consumingSegmentIndexConfig for columns " + profiledColumns + + " produces an invalid mutable consuming shape: " + e.getMessage(), e); + } + } + } + + private static Set enforceConsumingSegmentIndexConfigInvariants(TableConfig tableConfig, + @Nullable Schema schema) { + ConsumingSegmentIndexConfig consumingSegmentIndexConfig = getConsumingSegmentIndexConfig(tableConfig); + Preconditions.checkState(consumingSegmentIndexConfig != null, + "Missing consuming segment index config for table: %s", tableConfig.getTableName()); + Preconditions.checkArgument(tableConfig.getTableType() == TableType.REALTIME, + "realtimeConfig.consumingSegmentIndexConfig is only supported on REALTIME tables; table %s is %s", + tableConfig.getTableName(), tableConfig.getTableType()); + Preconditions.checkArgument(consumingSegmentIndexConfig.getUnknownProperties().isEmpty(), + "Unknown realtimeConfig.consumingSegmentIndexConfig keys: %s; allowed keys: [%s]", + consumingSegmentIndexConfig.getUnknownProperties().keySet(), + ConsumingSegmentIndexConfig.FIELD_CONFIG_LIST_KEY); + List fieldConfigList = consumingSegmentIndexConfig.getFieldConfigList(); + Preconditions.checkArgument(CollectionUtils.isNotEmpty(fieldConfigList), + "realtimeConfig.consumingSegmentIndexConfig.fieldConfigList must be non-empty"); + + Set profiledColumns = new HashSet<>(); + for (ConsumingSegmentFieldConfig fieldConfig : fieldConfigList) { + String column = fieldConfig.getName(); + Preconditions.checkArgument(StringUtils.isNotBlank(column), + "Each consuming segment field config must provide a non-blank name"); + Preconditions.checkArgument(profiledColumns.add(column), + "Duplicate consuming segment field config for column: %s", column); + if (schema != null) { + Preconditions.checkArgument(schema.getFieldSpecFor(column) != null, + "Consuming segment field config references unknown column: %s", column); + } + Preconditions.checkArgument(fieldConfig.getUnknownProperties().isEmpty(), + "Unknown consuming segment field config keys on column %s: %s; allowed keys: [%s, %s, %s]", + column, fieldConfig.getUnknownProperties().keySet(), ConsumingSegmentFieldConfig.NAME_KEY, + ConsumingSegmentFieldConfig.ENCODING_TYPE_KEY, ConsumingSegmentFieldConfig.INDEXES_KEY); + Preconditions.checkArgument(fieldConfig.getEncodingType() != null || fieldConfig.hasConfiguredIndexes(), + "Consuming segment field config for column %s must configure at least one of encodingType or indexes", + column); + if (fieldConfig.hasConfiguredIndexes()) { + validateConsumingSegmentProfileIndexes(column, fieldConfig.getIndexes()); + } + } + enforceConsumingSegmentIndexConfigStructuralInvariants(tableConfig, profiledColumns); + return profiledColumns; + } + + private static void validateConsumingSegmentProfileIndexes(String column, JsonNode indexes) { + Preconditions.checkArgument(indexes.isObject(), + "Consuming segment field config indexes must be a JSON object on column %s; got: %s", + column, indexes.getNodeType()); + Iterator indexNames = indexes.fieldNames(); + while (indexNames.hasNext()) { + String indexName = indexNames.next(); + Preconditions.checkArgument(SUPPORTED_CONSUMING_SEGMENT_PROFILE_INDEXES.contains(indexName), + "Unsupported consuming segment index '%s' on column %s; supported indexes: %s", + indexName, column, SUPPORTED_CONSUMING_SEGMENT_PROFILE_INDEXES); + JsonNode indexConfig = indexes.get(indexName); + Preconditions.checkArgument(indexConfig == null || indexConfig.isObject(), + "Consuming segment index '%s' on column %s must be a JSON object; got: %s", + indexName, column, indexConfig == null ? "null" : indexConfig.getNodeType()); + } + } + + private static void enforceConsumingSegmentIndexConfigStructuralInvariants(TableConfig tableConfig, + Set profiledColumns) { + IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); + if (indexingConfig == null || profiledColumns.isEmpty()) { + return; + } + List sortedColumns = indexingConfig.getSortedColumn(); + if (sortedColumns != null) { + for (String sortedColumn : sortedColumns) { + Preconditions.checkState(!profiledColumns.contains(sortedColumn), + "Consuming segment index profile is not allowed on sorted column: %s", sortedColumn); + } + } + SegmentPartitionConfig segmentPartitionConfig = indexingConfig.getSegmentPartitionConfig(); + if (segmentPartitionConfig != null && segmentPartitionConfig.getColumnPartitionMap() != null) { + for (String partitionColumn : segmentPartitionConfig.getColumnPartitionMap().keySet()) { + Preconditions.checkState(!profiledColumns.contains(partitionColumn), + "Consuming segment index profile is not allowed on partition column: %s", partitionColumn); + } + } + } + /** * Get the partition column from tableConfig instance assignment config map. * @param tableConfig table config diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingSegmentIndexConfigTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingSegmentIndexConfigTest.java new file mode 100644 index 000000000000..08da939a879e --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigConsumingSegmentIndexConfigTest.java @@ -0,0 +1,342 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.index.FieldIndexConfigs; +import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.ConsumingSegmentFieldConfig; +import org.apache.pinot.spi.config.table.ConsumingSegmentIndexConfig; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.RealtimeConfig; +import org.apache.pinot.spi.config.table.SegmentPartitionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotSame; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + + +/// Tests for [TableConfigUtils#applyConsumingSegmentIndexConfig(TableConfig)] and the validation rules around +/// `realtimeConfig.consumingSegmentIndexConfig`. The profile is mutable-consuming-only: committed immutable +/// segments continue to use the persisted table config. +public class TableConfigConsumingSegmentIndexConfigTest { + private static final String TABLE_NAME = "consumingProfileTable"; + private static final String TIME_COLUMN = "ts"; + + private static Schema buildSchema() { + return new Schema.SchemaBuilder().setSchemaName(TABLE_NAME) + .addSingleValueDimension("colA", org.apache.pinot.spi.data.FieldSpec.DataType.STRING) + .addSingleValueDimension("colB", org.apache.pinot.spi.data.FieldSpec.DataType.STRING) + .addDateTimeField(TIME_COLUMN, org.apache.pinot.spi.data.FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", + "1:MILLISECONDS") + .build(); + } + + private static Map streamConfigs() { + return Map.of( + "streamType", "kafka", + "stream.kafka.topic.name", TABLE_NAME, + "stream.kafka.consumer.type", "lowlevel", + "stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", + "stream.kafka.consumer.factory.class.name", + "org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory", + "stream.kafka.broker.list", "localhost:9092", + "realtime.segment.flush.threshold.rows", "1000"); + } + + private static ObjectNode invertedIndexConfig() { + ObjectNode indexes = JsonUtils.newObjectNode(); + indexes.set("inverted", JsonUtils.newObjectNode().put("enabled", true)); + return indexes; + } + + private static RealtimeConfig consumingProfile(String column, FieldConfig.EncodingType encodingType, + ObjectNode indexes) { + return new RealtimeConfig(new ConsumingSegmentIndexConfig(List.of( + new ConsumingSegmentFieldConfig(column, encodingType, indexes)))); + } + + private static TableConfig baseRealtimeTable(RealtimeConfig realtimeConfig) { + FieldConfig persistedRaw = new FieldConfig.Builder("colA") + .withEncodingType(FieldConfig.EncodingType.RAW) + .build(); + return new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(persistedRaw)) + .setNoDictionaryColumns(List.of("colA")) + .setRealtimeConfig(realtimeConfig) + .build(); + } + + @Test + public void applyConsumingSegmentIndexConfigUpgradesMutableShapeOnly() { + TableConfig tableConfig = + baseRealtimeTable(consumingProfile("colA", FieldConfig.EncodingType.DICTIONARY, invertedIndexConfig())); + + TableConfig consumingView = TableConfigUtils.applyConsumingSegmentIndexConfig(tableConfig); + + assertNotSame(consumingView, tableConfig); + FieldConfig consumingCol = consumingView.getFieldConfigList().get(0); + assertEquals(consumingCol.getEncodingType(), FieldConfig.EncodingType.DICTIONARY); + assertTrue(consumingCol.getIndexes().path("inverted").path("enabled").asBoolean(false)); + assertFalse(consumingView.getIndexingConfig().getNoDictionaryColumns().contains("colA")); + + FieldConfig persistedCol = tableConfig.getFieldConfigList().get(0); + assertEquals(persistedCol.getEncodingType(), FieldConfig.EncodingType.RAW); + assertTrue(tableConfig.getIndexingConfig().getNoDictionaryColumns().contains("colA")); + assertTrue(persistedCol.getIndexes().isNull(), "apply must not mutate the persisted table config"); + } + + @Test + public void applyConsumingSegmentIndexConfigScrubsNoDictionaryConfig() { + TableConfig tableConfig = + baseRealtimeTable(consumingProfile("colA", FieldConfig.EncodingType.DICTIONARY, invertedIndexConfig())); + tableConfig.getIndexingConfig().setNoDictionaryConfig(Map.of("colA", "RAW", "colB", "RAW")); + + TableConfig consumingView = TableConfigUtils.applyConsumingSegmentIndexConfig(tableConfig); + + assertFalse(consumingView.getIndexingConfig().getNoDictionaryConfig().containsKey("colA")); + assertEquals(consumingView.getIndexingConfig().getNoDictionaryConfig().get("colB"), "RAW"); + assertTrue(tableConfig.getIndexingConfig().getNoDictionaryConfig().containsKey("colA"), + "apply must not mutate noDictionaryConfig on the persisted table config"); + } + + @Test + public void applyEncodingOnlyProfilePreservesTopLevelIndexLists() { + TableConfig tableConfig = baseRealtimeTable(consumingProfile("colA", FieldConfig.EncodingType.DICTIONARY, null)); + tableConfig.getIndexingConfig().setInvertedIndexColumns(List.of("colA", "colB")); + + TableConfig consumingView = TableConfigUtils.applyConsumingSegmentIndexConfig(tableConfig); + + assertTrue(consumingView.getIndexingConfig().getInvertedIndexColumns().contains("colA"), + "Encoding-only profiles must not clear existing top-level index lists"); + assertTrue(consumingView.getIndexingConfig().getInvertedIndexColumns().contains("colB")); + assertFalse(consumingView.getIndexingConfig().getNoDictionaryColumns().contains("colA"), + "DICTIONARY encoding override must clear legacy no-dictionary config"); + assertTrue(tableConfig.getIndexingConfig().getNoDictionaryColumns().contains("colA"), + "apply must not mutate noDictionaryColumns on the persisted table config"); + } + + @Test + public void applyIndexOverrideProfileScrubsOnlyIndexConfigs() { + TableConfig tableConfig = baseRealtimeTable(consumingProfile("colA", null, JsonUtils.newObjectNode())); + tableConfig.getIndexingConfig().setInvertedIndexColumns(List.of("colA", "colB")); + tableConfig.getIndexingConfig().setOnHeapDictionaryColumns(List.of("colA", "colB")); + tableConfig.getIndexingConfig().setVarLengthDictionaryColumns(List.of("colA", "colB")); + + TableConfig consumingView = TableConfigUtils.applyConsumingSegmentIndexConfig(tableConfig); + + assertFalse(consumingView.getIndexingConfig().getInvertedIndexColumns().contains("colA")); + assertTrue(consumingView.getIndexingConfig().getInvertedIndexColumns().contains("colB")); + assertTrue(consumingView.getIndexingConfig().getOnHeapDictionaryColumns().contains("colA"), + "Index-only profiles must not clear dictionary implementation config"); + assertTrue(consumingView.getIndexingConfig().getVarLengthDictionaryColumns().contains("colA"), + "Index-only profiles must not clear dictionary implementation config"); + assertTrue(consumingView.getIndexingConfig().getNoDictionaryColumns().contains("colA"), + "Index-only profiles must not clear no-dictionary config unless encoding is overridden"); + assertTrue(consumingView.getFieldConfigList().get(0).getIndexes().isObject()); + assertTrue(consumingView.getFieldConfigList().get(0).getIndexes().isEmpty()); + } + + @Test + public void applyIndexOverrideProfileClearsLegacyFieldIndexTypes() { + FieldConfig legacyInverted = new FieldConfig.Builder("colA") + .withEncodingType(FieldConfig.EncodingType.DICTIONARY) + .withIndexTypes(List.of(FieldConfig.IndexType.INVERTED)) + .build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(legacyInverted)) + .setRealtimeConfig(consumingProfile("colA", null, JsonUtils.newObjectNode())) + .build(); + + TableConfig consumingView = TableConfigUtils.applyConsumingSegmentIndexConfig(tableConfig); + + assertTrue(consumingView.getFieldConfigList().get(0).getIndexTypes().isEmpty()); + assertTrue(tableConfig.getFieldConfigList().get(0).getIndexTypes().contains(FieldConfig.IndexType.INVERTED), + "apply must not mutate legacy indexTypes on the persisted table config"); + } + + @Test + public void applyConsumingSegmentIndexConfigReturnsSameTableWhenAbsent() { + TableConfig tableConfig = baseRealtimeTable(null); + + assertSame(TableConfigUtils.applyConsumingSegmentIndexConfig(tableConfig), tableConfig); + } + + @Test + public void validateRejectsOfflineTable() { + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setRealtimeConfig(consumingProfile("colA", FieldConfig.EncodingType.DICTIONARY, invertedIndexConfig())) + .build(); + + assertValidationFails(tableConfig, "only supported on REALTIME tables"); + } + + @Test + public void validateRejectsUnsupportedIndex() { + ObjectNode indexes = JsonUtils.newObjectNode(); + indexes.set("text", JsonUtils.newObjectNode().put("enabled", true)); + TableConfig tableConfig = baseRealtimeTable(consumingProfile("colA", FieldConfig.EncodingType.DICTIONARY, indexes)); + + assertValidationFails(tableConfig, "Unsupported consuming segment index"); + } + + @Test + public void validateRejectsUnknownFieldConfigKey() { + try { + ObjectNode tableConfigJson = (ObjectNode) baseRealtimeTable( + consumingProfile("colA", FieldConfig.EncodingType.DICTIONARY, invertedIndexConfig())).toJsonNode(); + ObjectNode profileFieldJson = (ObjectNode) tableConfigJson.path("realtimeConfig") + .path("consumingSegmentIndexConfig").path("fieldConfigList").get(0); + profileFieldJson.putArray("indexTypes").add("INVERTED"); + TableConfigUtils.validate(JsonUtils.jsonNodeToObject(tableConfigJson, TableConfig.class), buildSchema()); + fail("Expected validation failure for unknown consuming segment field config key"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("Unknown consuming segment field config keys"), + "Expected unknown-key message, got: " + e.getMessage()); + } + } + + @Test + public void validateRejectsUnknownRealtimeConfigKey() { + try { + ObjectNode tableConfigJson = (ObjectNode) baseRealtimeTable(null).toJsonNode(); + ObjectNode realtimeConfigJson = JsonUtils.newObjectNode(); + realtimeConfigJson.set("consumingSegmentIndexConfigs", JsonUtils.newObjectNode()); + tableConfigJson.set("realtimeConfig", realtimeConfigJson); + TableConfigUtils.validate(JsonUtils.jsonNodeToObject(tableConfigJson, TableConfig.class), buildSchema()); + fail("Expected validation failure for unknown realtime config key"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("Unknown realtimeConfig keys"), + "Expected unknown-key message, got: " + e.getMessage()); + } + } + + @Test + public void validateRejectsSortedColumn() { + TableConfig tableConfig = baseRealtimeTable( + consumingProfile("colA", FieldConfig.EncodingType.DICTIONARY, invertedIndexConfig())); + tableConfig.getIndexingConfig().setSortedColumn(List.of("colA")); + + assertValidationFails(tableConfig, "sorted column"); + } + + @Test + public void validateRejectsPartitionColumn() { + TableConfig tableConfig = baseRealtimeTable( + consumingProfile("colA", FieldConfig.EncodingType.DICTIONARY, invertedIndexConfig())); + tableConfig.getIndexingConfig().setSegmentPartitionConfig( + new SegmentPartitionConfig(Map.of("colA", new ColumnPartitionConfig("Murmur", 4)))); + + assertValidationFails(tableConfig, "partition column"); + } + + @Test + public void buildConsumingSegmentConfigBuilderAppliesProfile() { + TableConfig tableConfig = + baseRealtimeTable(consumingProfile("colA", FieldConfig.EncodingType.DICTIONARY, invertedIndexConfig())); + IndexLoadingConfig ilc = new IndexLoadingConfig(tableConfig, buildSchema()); + + RealtimeSegmentConfig built = TableConfigUtils.buildConsumingSegmentConfigBuilder( + tableConfig, buildSchema(), ilc, LoggerFactory.getLogger(TableConfigConsumingSegmentIndexConfigTest.class), + null) + .build(); + + FieldIndexConfigs colA = built.getIndexConfigByCol().get("colA"); + assertTrue(colA.getConfig(StandardIndexes.dictionary()).isEnabled()); + assertTrue(colA.getConfig(StandardIndexes.inverted()).isEnabled()); + } + + @Test + public void buildConsumingSegmentConfigBuilderFallsBackOnInvalidProfile() { + ObjectNode indexes = JsonUtils.newObjectNode(); + indexes.set("text", JsonUtils.newObjectNode().put("enabled", true)); + TableConfig tableConfig = baseRealtimeTable(consumingProfile("colA", FieldConfig.EncodingType.DICTIONARY, indexes)); + IndexLoadingConfig ilc = new IndexLoadingConfig(tableConfig, buildSchema()); + AtomicInteger fallbackInvocations = new AtomicInteger(); + + RealtimeSegmentConfig built = TableConfigUtils.buildConsumingSegmentConfigBuilder( + tableConfig, buildSchema(), ilc, LoggerFactory.getLogger(TableConfigConsumingSegmentIndexConfigTest.class), + fallbackInvocations::incrementAndGet) + .build(); + + FieldIndexConfigs colA = built.getIndexConfigByCol().get("colA"); + assertFalse(colA.getConfig(StandardIndexes.dictionary()).isEnabled(), + "Fallback must use the persisted RAW/no-dictionary shape"); + assertEquals(fallbackInvocations.get(), 1); + } + + @Test + public void consumingProfileDoesNotApplyStorageTierOverwrites() + throws Exception { + FieldConfig persistedRawWithTierOverwrite = new FieldConfig.Builder("colA") + .withEncodingType(FieldConfig.EncodingType.RAW) + .withTierOverwrites(JsonUtils.stringToJsonNode("{\"coldTier\":{\"encodingType\":\"RAW\",\"indexes\":{}}}")) + .build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME) + .setTimeColumnName(TIME_COLUMN) + .setStreamConfigs(streamConfigs()) + .setFieldConfigList(List.of(persistedRawWithTierOverwrite)) + .setNoDictionaryColumns(List.of("colA")) + .setRealtimeConfig(consumingProfile("colA", FieldConfig.EncodingType.DICTIONARY, invertedIndexConfig())) + .build(); + IndexLoadingConfig ilc = new IndexLoadingConfig(tableConfig, buildSchema()); + ilc.setSegmentTier("coldTier"); + + RealtimeSegmentConfig built = TableConfigUtils.buildConsumingSegmentConfigBuilder( + tableConfig, buildSchema(), ilc, LoggerFactory.getLogger(TableConfigConsumingSegmentIndexConfigTest.class), + null) + .build(); + + FieldIndexConfigs colA = built.getIndexConfigByCol().get("colA"); + assertTrue(colA.getConfig(StandardIndexes.dictionary()).isEnabled(), + "Consuming profile is lifecycle-scoped and must not be overwritten by storage-tier config"); + assertTrue(colA.getConfig(StandardIndexes.inverted()).isEnabled()); + } + + private static void assertValidationFails(TableConfig tableConfig, String expectedMessagePart) { + try { + TableConfigUtils.validate(tableConfig, buildSchema()); + fail("Expected validation failure containing: " + expectedMessagePart); + } catch (RuntimeException e) { + assertTrue(e.getMessage().contains(expectedMessagePart), + "Expected message containing '" + expectedMessagePart + "', got: " + e.getMessage()); + } + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ConsumingSegmentFieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ConsumingSegmentFieldConfig.java new file mode 100644 index 000000000000..fbcf0ae9f54a --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ConsumingSegmentFieldConfig.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table; + +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import java.util.LinkedHashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.BaseJsonConfig; + + +/// Field-level index profile applied only while a realtime segment is mutable and consuming. +/// +/// This config intentionally exposes a narrow subset of [FieldConfig]: `name`, `encodingType`, and `indexes`. +/// Validation rejects all other keys so mutable consuming segment shape cannot diverge from the surrounding +/// ingestion pipeline in unsupported ways. +public class ConsumingSegmentFieldConfig extends BaseJsonConfig { + public static final String NAME_KEY = "name"; + public static final String ENCODING_TYPE_KEY = "encodingType"; + public static final String INDEXES_KEY = "indexes"; + + private final String _name; + private final FieldConfig.EncodingType _encodingType; + private final JsonNode _indexes; + private final Map _unknownProperties = new LinkedHashMap<>(); + + @JsonCreator + public ConsumingSegmentFieldConfig(@JsonProperty(value = NAME_KEY, required = true) String name, + @JsonProperty(ENCODING_TYPE_KEY) @Nullable FieldConfig.EncodingType encodingType, + @JsonProperty(INDEXES_KEY) @Nullable JsonNode indexes) { + _name = name; + _encodingType = encodingType; + _indexes = indexes == null ? NullNode.getInstance() : indexes; + } + + @JsonProperty(NAME_KEY) + public String getName() { + return _name; + } + + @JsonProperty(ENCODING_TYPE_KEY) + @Nullable + public FieldConfig.EncodingType getEncodingType() { + return _encodingType; + } + + @JsonProperty(INDEXES_KEY) + public JsonNode getIndexes() { + return _indexes; + } + + @JsonAnySetter + public void setUnknownProperty(String key, JsonNode value) { + _unknownProperties.put(key, value); + } + + @JsonIgnore + public Map getUnknownProperties() { + return _unknownProperties; + } + + @JsonIgnore + public boolean hasConfiguredIndexes() { + return !_indexes.isNull(); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ConsumingSegmentIndexConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ConsumingSegmentIndexConfig.java new file mode 100644 index 000000000000..ea4c5195199a --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ConsumingSegmentIndexConfig.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table; + +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.BaseJsonConfig; + + +/// Index profile applied only to realtime mutable consuming segments. +/// +/// The committed segment and immutable segment loads continue to use the persisted table config, optionally with +/// real storage-tier overwrites. This profile is intentionally lifecycle-scoped and is not a storage tier. +public class ConsumingSegmentIndexConfig extends BaseJsonConfig { + public static final String FIELD_CONFIG_LIST_KEY = "fieldConfigList"; + + private final List _fieldConfigList; + private final Map _unknownProperties = new LinkedHashMap<>(); + + @JsonCreator + public ConsumingSegmentIndexConfig( + @JsonProperty(FIELD_CONFIG_LIST_KEY) @Nullable List fieldConfigList) { + _fieldConfigList = fieldConfigList; + } + + @JsonProperty(FIELD_CONFIG_LIST_KEY) + @Nullable + public List getFieldConfigList() { + return _fieldConfigList; + } + + @JsonAnySetter + public void setUnknownProperty(String key, JsonNode value) { + _unknownProperties.put(key, value); + } + + @JsonIgnore + public Map getUnknownProperties() { + return _unknownProperties; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RealtimeConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RealtimeConfig.java new file mode 100644 index 000000000000..37e58078985a --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RealtimeConfig.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table; + +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import java.util.LinkedHashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.spi.config.BaseJsonConfig; + + +/// Realtime-specific table config. +/// +/// Keep lifecycle-specific realtime controls here instead of overloading storage-tier config. The first supported +/// setting is [#getConsumingSegmentIndexConfig()], which affects only mutable consuming segment construction. +public class RealtimeConfig extends BaseJsonConfig { + public static final String CONSUMING_SEGMENT_INDEX_CONFIG_KEY = "consumingSegmentIndexConfig"; + + private final ConsumingSegmentIndexConfig _consumingSegmentIndexConfig; + private final Map _unknownProperties = new LinkedHashMap<>(); + + @JsonCreator + public RealtimeConfig( + @JsonProperty(CONSUMING_SEGMENT_INDEX_CONFIG_KEY) @Nullable ConsumingSegmentIndexConfig + consumingSegmentIndexConfig) { + _consumingSegmentIndexConfig = consumingSegmentIndexConfig; + } + + @JsonProperty(CONSUMING_SEGMENT_INDEX_CONFIG_KEY) + @Nullable + public ConsumingSegmentIndexConfig getConsumingSegmentIndexConfig() { + return _consumingSegmentIndexConfig; + } + + @JsonAnySetter + public void setUnknownProperty(String key, JsonNode value) { + _unknownProperties.put(key, value); + } + + @JsonIgnore + public Map getUnknownProperties() { + return _unknownProperties; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java index 7d214bf8d678..906903ca1dc1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java @@ -53,6 +53,7 @@ public class TableConfig extends BaseJsonConfig { public static final String TASK_CONFIG_KEY = "task"; public static final String ROUTING_CONFIG_KEY = "routing"; public static final String QUERY_CONFIG_KEY = "query"; + public static final String REALTIME_CONFIG_KEY = "realtimeConfig"; public static final String INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY = "instanceAssignmentConfigMap"; public static final String INSTANCE_PARTITIONS_MAP_CONFIG_KEY = "instancePartitionsMap"; public static final String SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY = "segmentAssignmentConfigMap"; @@ -105,6 +106,7 @@ public class TableConfig extends BaseJsonConfig { private TableTaskConfig _taskConfig; private RoutingConfig _routingConfig; private QueryConfig _queryConfig; + private RealtimeConfig _realtimeConfig; private Map _instanceAssignmentConfigMap; @JsonPropertyDescription(value = "Point to an existing instance partitions") @@ -134,7 +136,6 @@ public class TableConfig extends BaseJsonConfig { @JsonPropertyDescription(value = "Configs for table samplers") private List _tableSamplers; - @JsonCreator public TableConfig(@JsonProperty(value = TABLE_NAME_KEY, required = true) String tableName, @JsonProperty(value = TABLE_TYPE_KEY, required = true) String tableType, @JsonProperty(value = VALIDATION_CONFIG_KEY, required = true) @@ -161,6 +162,40 @@ public TableConfig(@JsonProperty(value = TABLE_NAME_KEY, required = true) String @JsonProperty(SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY) @Nullable Map segmentAssignmentConfigMap, @JsonProperty(TABLE_SAMPLERS_KEY) @Nullable List tableSamplers) { + this(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig, quotaConfig, taskConfig, + routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, upsertConfig, dedupConfig, + dimensionTableConfig, ingestionConfig, tierConfigsList, dimTable, tunerConfigList, instancePartitionsMap, + segmentAssignmentConfigMap, tableSamplers, null); + } + + @JsonCreator + public TableConfig(@JsonProperty(value = TABLE_NAME_KEY, required = true) String tableName, + @JsonProperty(value = TABLE_TYPE_KEY, required = true) String tableType, + @JsonProperty(value = VALIDATION_CONFIG_KEY, required = true) + SegmentsValidationAndRetentionConfig validationConfig, + @JsonProperty(value = TENANT_CONFIG_KEY, required = true) TenantConfig tenantConfig, + @JsonProperty(value = INDEXING_CONFIG_KEY, required = true) IndexingConfig indexingConfig, + @JsonProperty(value = CUSTOM_CONFIG_KEY) TableCustomConfig customConfig, + @JsonProperty(QUOTA_CONFIG_KEY) @Nullable QuotaConfig quotaConfig, + @JsonProperty(TASK_CONFIG_KEY) @Nullable TableTaskConfig taskConfig, + @JsonProperty(ROUTING_CONFIG_KEY) @Nullable RoutingConfig routingConfig, + @JsonProperty(QUERY_CONFIG_KEY) @Nullable QueryConfig queryConfig, + @JsonProperty(INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY) @Nullable + Map instanceAssignmentConfigMap, + @JsonProperty(FIELD_CONFIG_LIST_KEY) @Nullable List fieldConfigList, + @JsonProperty(UPSERT_CONFIG_KEY) @Nullable UpsertConfig upsertConfig, + @JsonProperty(DEDUP_CONFIG_KEY) @Nullable DedupConfig dedupConfig, + @JsonProperty(DIMENSION_TABLE_CONFIG_KEY) @Nullable DimensionTableConfig dimensionTableConfig, + @JsonProperty(INGESTION_CONFIG_KEY) @Nullable IngestionConfig ingestionConfig, + @JsonProperty(TIER_CONFIGS_LIST_KEY) @Nullable List tierConfigsList, + @JsonProperty(IS_DIM_TABLE_KEY) boolean dimTable, + @JsonProperty(TUNER_CONFIG_LIST_KEY) @Nullable List tunerConfigList, + @JsonProperty(INSTANCE_PARTITIONS_MAP_CONFIG_KEY) @Nullable + Map instancePartitionsMap, + @JsonProperty(SEGMENT_ASSIGNMENT_CONFIG_MAP_KEY) @Nullable + Map segmentAssignmentConfigMap, + @JsonProperty(TABLE_SAMPLERS_KEY) @Nullable List tableSamplers, + @JsonProperty(REALTIME_CONFIG_KEY) @Nullable RealtimeConfig realtimeConfig) { Preconditions.checkArgument(tableName != null, "'tableName' must be configured"); Preconditions.checkArgument(!tableName.contains(TABLE_NAME_FORBIDDEN_SUBSTRING), "'tableName' cannot contain double underscore ('__')"); @@ -180,6 +215,7 @@ public TableConfig(@JsonProperty(value = TABLE_NAME_KEY, required = true) String _taskConfig = taskConfig; _routingConfig = routingConfig; _queryConfig = queryConfig; + _realtimeConfig = realtimeConfig; _instanceAssignmentConfigMap = instanceAssignmentConfigMap; _fieldConfigList = fieldConfigList; _upsertConfig = upsertConfig; @@ -205,6 +241,7 @@ public TableConfig(TableConfig tableConfig) { _taskConfig = tableConfig.getTaskConfig(); _routingConfig = tableConfig.getRoutingConfig(); _queryConfig = tableConfig.getQueryConfig(); + _realtimeConfig = tableConfig.getRealtimeConfig(); _instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap(); _fieldConfigList = tableConfig.getFieldConfigList(); _upsertConfig = tableConfig.getUpsertConfig(); @@ -380,6 +417,16 @@ public void setQueryConfig(QueryConfig queryConfig) { _queryConfig = queryConfig; } + @JsonProperty(REALTIME_CONFIG_KEY) + @Nullable + public RealtimeConfig getRealtimeConfig() { + return _realtimeConfig; + } + + public void setRealtimeConfig(@Nullable RealtimeConfig realtimeConfig) { + _realtimeConfig = realtimeConfig; + } + @JsonProperty(INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY) @Nullable public Map getInstanceAssignmentConfigMap() { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java index 3b316d337396..4651977fc985 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java @@ -33,6 +33,7 @@ import org.apache.pinot.spi.config.table.MultiColumnTextIndexConfig; import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.QuotaConfig; +import org.apache.pinot.spi.config.table.RealtimeConfig; import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.RoutingConfig; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; @@ -129,6 +130,7 @@ public class TableConfigBuilder { private TableTaskConfig _taskConfig; private RoutingConfig _routingConfig; private QueryConfig _queryConfig; + private RealtimeConfig _realtimeConfig; private List _tableSamplers; private Map _instanceAssignmentConfigMap; private Map _instancePartitionsMap; @@ -425,6 +427,11 @@ public TableConfigBuilder setQueryConfig(QueryConfig queryConfig) { return this; } + public TableConfigBuilder setRealtimeConfig(RealtimeConfig realtimeConfig) { + _realtimeConfig = realtimeConfig; + return this; + } + public TableConfigBuilder setTableSamplers(List tableSamplers) { _tableSamplers = tableSamplers; return this; @@ -568,7 +575,8 @@ public TableConfig build() { new TableConfig(_tableName, _tableType.toString(), validationConfig, tenantConfig, indexingConfig, _customConfig, _quotaConfig, _taskConfig, _routingConfig, _queryConfig, _instanceAssignmentConfigMap, _fieldConfigList, _upsertConfig, _dedupConfig, _dimensionTableConfig, _ingestionConfig, _tierConfigList, - _isDimTable, _tunerConfigList, _instancePartitionsMap, _segmentAssignmentConfigMap, _tableSamplers); + _isDimTable, _tunerConfigList, _instancePartitionsMap, _segmentAssignmentConfigMap, _tableSamplers, + _realtimeConfig); tableConfig.setDescription(_description); tableConfig.setTags(_tags); return tableConfig; diff --git a/pinot-tools/src/main/resources/examples/stream/consumingSegmentIndexProfile/README.md b/pinot-tools/src/main/resources/examples/stream/consumingSegmentIndexProfile/README.md new file mode 100644 index 000000000000..33cf70a0a9d6 --- /dev/null +++ b/pinot-tools/src/main/resources/examples/stream/consumingSegmentIndexProfile/README.md @@ -0,0 +1,77 @@ + +# Realtime consuming segment index profile + +`realtimeConfig.consumingSegmentIndexConfig` lets a realtime table use a richer index shape only while a segment is +mutable and consuming. The committed immutable segment still uses the persisted table config. + +Use this when recent data is queried heavily but long-term storage should stay compact. For example, keep `userId` RAW +on disk, but add a dictionary and inverted index in the consuming segment so equality filters on fresh rows are fast. + +Example query: + +```sql +SELECT COUNT(*) +FROM userEvents +WHERE userId = 'u123'; +``` + +In `userEvents_realtime_table_config.json`, the persisted shape is RAW: + +```json +"tableIndexConfig": { + "noDictionaryColumns": ["userId"] +}, +"fieldConfigList": [ + { + "name": "userId", + "encodingType": "RAW" + } +] +``` + +The consuming profile applies only to mutable consuming segments: + +```json +"realtimeConfig": { + "consumingSegmentIndexConfig": { + "fieldConfigList": [ + { + "name": "userId", + "encodingType": "DICTIONARY", + "indexes": { + "inverted": { + "enabled": true + } + } + } + ] + } +} +``` + +Supported fields in the first version are intentionally narrow: + +- `encodingType` +- `indexes.inverted` + +The profile is not a storage tier. `tierOverwrites` continue to apply only to immutable segments loaded on real segment +tiers. diff --git a/pinot-tools/src/main/resources/examples/stream/consumingSegmentIndexProfile/userEvents_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/consumingSegmentIndexProfile/userEvents_realtime_table_config.json new file mode 100644 index 000000000000..adaefc87bc8a --- /dev/null +++ b/pinot-tools/src/main/resources/examples/stream/consumingSegmentIndexProfile/userEvents_realtime_table_config.json @@ -0,0 +1,51 @@ +{ + "tableName": "userEvents", + "tableType": "REALTIME", + "tenants": {}, + "segmentsConfig": { + "timeColumnName": "tsMillis", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "7", + "replication": "1" + }, + "tableIndexConfig": { + "noDictionaryColumns": [ + "userId" + ] + }, + "fieldConfigList": [ + { + "name": "userId", + "encodingType": "RAW" + } + ], + "realtimeConfig": { + "consumingSegmentIndexConfig": { + "fieldConfigList": [ + { + "name": "userId", + "encodingType": "DICTIONARY", + "indexes": { + "inverted": { + "enabled": true + } + } + } + ] + } + }, + "ingestionConfig": { + "streamIngestionConfig": { + "streamConfigMaps": [ + { + "streamType": "kafka", + "stream.kafka.topic.name": "userEvents", + "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", + "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory", + "stream.kafka.consumer.prop.auto.offset.reset": "smallest", + "stream.kafka.broker.list": "localhost:19092" + } + ] + } + } +}