Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.UpsertContext;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
Expand Down Expand Up @@ -1821,9 +1822,12 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
_isOffHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();
_defaultNullHandlingEnabled = indexingConfig.isNullHandlingEnabled();

// Start new realtime segment
// Start new realtime segment. The dispatch helper keeps the existing IndexLoadingConfig-driven path when no
// consuming tier overrides are configured, and builds a mutable-only tier-overwritten view when they are.
String consumerDir = realtimeTableDataManager.getConsumerDir();
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig)
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
TableConfigUtils.buildConsumingSegmentConfigBuilder(_tableConfig, _schema, indexLoadingConfig);
realtimeSegmentConfigBuilder
.setTableNameWithType(_tableNameWithType)
.setSegmentName(_segmentNameStr)
.setStreamName(streamTopic)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pinot.integration.tests.BaseClusterIntegrationTest;
import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -305,6 +306,11 @@ protected BaseControllerStarter getSharedControllerStarter() {
return _sharedClusterTestSuite._controllerStarter;
}

/// Returns server starters from the shared suite instance.
protected List<BaseServerStarter> getSharedServerStarters() {
return _sharedClusterTestSuite._serverStarters;
}

/**
* Returns the property store from the shared suite instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.utils.IngestionUtils;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
Expand Down Expand Up @@ -180,9 +181,11 @@ public StatelessRealtimeSegmentWriter(SegmentZKMetadata segmentZKMetadata, Index
File statsHistoryFile = new File(tableDataDir, SEGMENT_STATS_FILE_NAME);
RealtimeSegmentStatsHistory statsHistory = RealtimeSegmentStatsHistory.deserializeFrom(statsHistoryFile);

// Initialize mutable segment with configurations
// Initialize mutable segment with configurations.
IngestionConfig ingestionConfig = _tableConfig.getIngestionConfig();
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = new RealtimeSegmentConfig.Builder(indexLoadingConfig)
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
TableConfigUtils.buildConsumingSegmentConfigBuilder(_tableConfig, _schema, indexLoadingConfig);
realtimeSegmentConfigBuilder
.setTableNameWithType(_tableNameWithType)
.setSegmentName(_segmentName)
.setStreamName(_streamConfig.getTopicName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ public void refreshIndexConfigs() {
}

private TableConfig getTableConfigWithTierOverwrites() {
return (_segmentTier == null || _tableConfig == null) ? _tableConfig
return (_segmentTier == null || _tableConfig == null
|| TableConfigUtils.isSyntheticConsumingSegmentTier(_tableConfig, _segmentTier))
? _tableConfig
: TableConfigUtils.overwriteTableConfigForTier(_tableConfig, _segmentTier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.segment.local.aggregator.ValueAggregator;
import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.segment.local.recordtransformer.SchemaConformingTransformer;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
Expand Down Expand Up @@ -134,6 +136,12 @@ private TableConfigUtils() {
// this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we
// hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
private static final String KINESIS_STREAM_TYPE = "kinesis";
private static final String CONSUMING_SEGMENT_TIER = "consuming";
private static final Set<String> CONSUMING_SEGMENT_TIER_INDEXING_CONFIG_KEYS = ImmutableSet.of(
"invertedIndexColumns", "rangeIndexColumns", "rangeIndexVersion", "jsonIndexColumns", "jsonIndexConfigs",
"sortedColumn", "bloomFilterColumns", "bloomFilterConfigs", "noDictionaryColumns", "noDictionaryConfig",
"onHeapDictionaryColumns", "varLengthDictionaryColumns", "optimizeDictionary", "optimizeDictionaryForMetrics",
"optimizeDictionaryType", "noDictionarySizeRatioThreshold", "noDictionaryCardinalityRatioThreshold");

private static final Set<String> UPSERT_DEDUP_ALLOWED_ROUTING_STRATEGIES =
ImmutableSet.of(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
Expand Down Expand Up @@ -187,6 +195,7 @@ public static void validate(TableConfig tableConfig, Schema schema, @Nullable St
}
validateTierConfigList(tableConfig.getTierConfigsList());
validateIndexingConfigAndFieldConfigList(tableConfig, schema);
validateConsumingTierOverwrites(tableConfig, schema);
validateInstancePartitionsTypeMapConfig(tableConfig);
validatePartitionedReplicaGroupInstance(tableConfig);
validateInstanceAssignmentConfigs(tableConfig);
Expand Down Expand Up @@ -2126,6 +2135,121 @@ private static void overwriteConfig(JsonNode oldCfg, JsonNode newCfg) {
}
}

public static boolean isConsumingSegmentTier(@Nullable String tier) {
return CONSUMING_SEGMENT_TIER.equals(tier);
}

public static boolean isSyntheticConsumingSegmentTier(@Nullable TableConfig tableConfig, @Nullable String tier) {
return isConsumingSegmentTier(tier) && !hasTierConfig(tableConfig, tier);
}

private static void validateConsumingTierOverwrites(TableConfig tableConfig, Schema schema) {
if (tableConfig.getTableType() != TableType.REALTIME
|| !hasTierOverwritesForTier(tableConfig, CONSUMING_SEGMENT_TIER)
|| hasTierConfig(tableConfig, CONSUMING_SEGMENT_TIER)) {
return;
}
validateConsumingTierOverwriteScope(tableConfig);
TableConfig consumingTableConfig = overwriteTableConfigForTier(tableConfig, CONSUMING_SEGMENT_TIER);
Preconditions.checkState(consumingTableConfig != tableConfig,
"Failed to apply tierOverwrites.%s for table: %s", CONSUMING_SEGMENT_TIER, tableConfig.getTableName());
try {
validateIndexingConfigAndFieldConfigList(consumingTableConfig, schema);
} catch (RuntimeException e) {
throw new IllegalStateException(
"tierOverwrites." + CONSUMING_SEGMENT_TIER + " produces an invalid mutable consuming segment config: "
+ e.getMessage(), e);
}
}

private static void validateConsumingTierOverwriteScope(TableConfig tableConfig) {
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
if (indexingConfig == null) {
return;
}
JsonNode consumingIndexingOverwrite = getTierOverwrite(indexingConfig.getTierOverwrites(), CONSUMING_SEGMENT_TIER);
if (consumingIndexingOverwrite == null) {
return;
}
Preconditions.checkState(consumingIndexingOverwrite.isObject(),
"tableIndexConfig.tierOverwrites.%s must be a JSON object", CONSUMING_SEGMENT_TIER);
Iterator<String> keys = consumingIndexingOverwrite.fieldNames();
while (keys.hasNext()) {
String key = keys.next();
Preconditions.checkState(CONSUMING_SEGMENT_TIER_INDEXING_CONFIG_KEYS.contains(key),
"Unsupported tableIndexConfig.tierOverwrites.%s key: %s; supported keys: %s", CONSUMING_SEGMENT_TIER,
key, CONSUMING_SEGMENT_TIER_INDEXING_CONFIG_KEYS);
}
}

/// Builds the [RealtimeSegmentConfig.Builder] for a mutable consuming segment. If the table config contains
/// `tierOverwrites.consuming` under `tableIndexConfig` or a `fieldConfigList` entry, the builder is created from the
/// existing tier-overwrite view for that synthetic tier. Only index-loading settings are supported for
/// `tableIndexConfig.tierOverwrites.consuming`; settings that control row shape or ingestion behavior must stay on
/// the persisted table config. If `consuming` is already configured as a real storage tier, storage-tier semantics
/// take precedence for backward compatibility. The original [IndexLoadingConfig] is left untouched so the commit path
/// and immutable segment load path continue to use the persisted table config and real segment tier.
public static RealtimeSegmentConfig.Builder buildConsumingSegmentConfigBuilder(TableConfig tableConfig, Schema schema,
IndexLoadingConfig indexLoadingConfig) {
if (!hasTierOverwritesForTier(tableConfig, CONSUMING_SEGMENT_TIER)
|| hasTierConfig(tableConfig, CONSUMING_SEGMENT_TIER)) {
return new RealtimeSegmentConfig.Builder(indexLoadingConfig);
}
TableConfig consumingTableConfig = overwriteTableConfigForTier(tableConfig, CONSUMING_SEGMENT_TIER);
if (consumingTableConfig == tableConfig) {
return new RealtimeSegmentConfig.Builder(indexLoadingConfig);
}
Schema schemaCopy;
try {
schemaCopy = JsonUtils.jsonNodeToObject(schema.toJsonObject(), Schema.class);
} catch (IOException e) {
throw new IllegalStateException(
"Failed to clone schema while applying consuming tier overrides for table: " + tableConfig.getTableName(), e);
}
IndexLoadingConfig consumingIndexLoadingConfig =
new IndexLoadingConfig(indexLoadingConfig.getInstanceDataManagerConfig(), consumingTableConfig, schemaCopy);
return new RealtimeSegmentConfig.Builder(consumingIndexLoadingConfig);
}

private static boolean hasTierOverwritesForTier(TableConfig tableConfig, String tier) {
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
if (indexingConfig != null && hasTierOverwrite(indexingConfig.getTierOverwrites(), tier)) {
return true;
}
List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
if (fieldConfigList == null) {
return false;
}
for (FieldConfig fieldConfig : fieldConfigList) {
if (hasTierOverwrite(fieldConfig.getTierOverwrites(), tier)) {
return true;
}
}
return false;
}

private static boolean hasTierConfig(@Nullable TableConfig tableConfig, String tier) {
if (tableConfig == null || tableConfig.getTierConfigsList() == null) {
return false;
}
for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
if (tier.equals(tierConfig.getName())) {
return true;
}
}
return false;
}

private static boolean hasTierOverwrite(@Nullable JsonNode tierOverwrites, String tier) {
JsonNode tierOverwrite = getTierOverwrite(tierOverwrites, tier);
return tierOverwrite != null && !tierOverwrite.isNull();
}

@Nullable
private static JsonNode getTierOverwrite(@Nullable JsonNode tierOverwrites, String tier) {
return tierOverwrites != null && tierOverwrites.isObject() ? tierOverwrites.get(tier) : null;
}

/**
* Get the partition column from tableConfig instance assignment config map.
* @param tableConfig table config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.StandardIndexes;
Expand All @@ -32,6 +33,7 @@
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -157,6 +159,72 @@ public void testCalculateIndexConfigsWithTierOverwrites()
assertFalse(fieldCfgs.getConfig(StandardIndexes.dictionary()).isEnabled());
}

@Test
public void testSyntheticConsumingTierIsNotAppliedAsStorageTier()
throws IOException {
Schema schema =
new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("col1", FieldSpec.DataType.STRING)
.build();
FieldConfig col1Cfg = JsonUtils.stringToObject("{"
+ " \"name\": \"col1\","
+ " \"encodingType\": \"RAW\","
+ " \"tierOverwrites\": {"
+ " \"consuming\": {"
+ " \"encodingType\": \"DICTIONARY\","
+ " \"indexes\": {"
+ " \"inverted\": {\"enabled\": \"true\"}"
+ " }"
+ " }"
+ " }"
+ "}", FieldConfig.class);
TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setNoDictionaryColumns(List.of("col1"))
.setTierOverwrites(JsonUtils.stringToJsonNode("{\"consuming\": {\"noDictionaryColumns\": []}}"))
.setFieldConfigList(List.of(col1Cfg))
.build();

IndexLoadingConfig ilc = new IndexLoadingConfig(tableConfig, schema);
ilc.setSegmentTier("consuming");

FieldIndexConfigs fieldCfgs = ilc.getFieldIndexConfig("col1");
assertFalse(fieldCfgs.getConfig(StandardIndexes.dictionary()).isEnabled());
assertFalse(fieldCfgs.getConfig(StandardIndexes.inverted()).isEnabled());
}

@Test
public void testRealConsumingStorageTierIsAppliedAsStorageTier()
throws IOException {
Schema schema =
new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("col1", FieldSpec.DataType.STRING)
.build();
FieldConfig col1Cfg = JsonUtils.stringToObject("{"
+ " \"name\": \"col1\","
+ " \"encodingType\": \"RAW\","
+ " \"tierOverwrites\": {"
+ " \"consuming\": {"
+ " \"encodingType\": \"DICTIONARY\","
+ " \"indexes\": {"
+ " \"inverted\": {\"enabled\": \"true\"}"
+ " }"
+ " }"
+ " }"
+ "}", FieldConfig.class);
TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
.setNoDictionaryColumns(List.of("col1"))
.setTierOverwrites(JsonUtils.stringToJsonNode("{\"consuming\": {\"noDictionaryColumns\": []}}"))
.setFieldConfigList(List.of(col1Cfg))
.setTierConfigList(List.of(new TierConfig("consuming", TierFactory.TIME_SEGMENT_SELECTOR_TYPE, "30d", null,
TierFactory.PINOT_SERVER_STORAGE_TYPE, "consuming_tag_REALTIME", null, null)))
.build();

IndexLoadingConfig ilc = new IndexLoadingConfig(tableConfig, schema);
ilc.setSegmentTier("consuming");

FieldIndexConfigs fieldCfgs = ilc.getFieldIndexConfig("col1");
assertTrue(fieldCfgs.getConfig(StandardIndexes.dictionary()).isEnabled());
assertTrue(fieldCfgs.getConfig(StandardIndexes.inverted()).isEnabled());
}

@Test
public void testCalculateForwardIndexConfig()
throws JsonProcessingException {
Expand Down
Loading
Loading