diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 2dfc9d96540c..b0dae7dc59b2 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -226,7 +226,9 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { // HTTP thread utilization HTTP_THREAD_UTILIZATION("httpThreadUtilization", true), // Track the concurrent executions of the API resources that use @ManagedAsync - MANAGED_ASYNC_ACTIVE_THREADS("threads", true); + MANAGED_ASYNC_ACTIVE_THREADS("threads", true), + // Backfill circuit breaking: number of active backfill Kafka topics currently running for this table + BACKFILL_TOPICS_IN_PROGRESS("backfillTopicsInProgress", false); private final String _gaugeName; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java index d812d1c10595..c3183d418863 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java @@ -78,6 +78,13 @@ public enum ControllerMeter implements AbstractMetrics.Meter { PARTITION_GROUP_METADATA_FETCH_ERROR("failures", true), OFFSET_AUTO_RESET_SKIPPED_OFFSETS("autoResetSkippedOffsets", false), OFFSET_AUTO_RESET_BACKFILL_OFFSETS("autoResetBackfillOffsets", false), + OFFSET_AUTO_RESET_BACKFILL_SKIPPED_PAUSED("BackfillSkippedPaused", false), + OFFSET_AUTO_RESET_BACKFILL_SKIPPED_MAX_SEGMENTS("BackfillSkippedMaxSegments", false), + OFFSET_AUTO_RESET_BACKFILL_SKIPPED_MAX_CONCURRENT("BackfillSkippedMaxConcurrent", false), + OFFSET_AUTO_RESET_BACKFILL_SKIPPED_IN_FLIGHT("BackfillSkippedInFlight", false), + OFFSET_AUTO_RESET_HANDLER_INIT_FAILURE("BackfillHandlerInitFailure", false), + OFFSET_AUTO_RESET_AUTO_PAUSE_FAILURE("BackfillAutoPauseFailure", false), + OFFSET_AUTO_RESET_BACKFILL_CLEANUP_COMPLETED("BackfillCleanupCompleted", false), // Audit logging metrics AUDIT_REQUEST_FAILURES("failures", true), AUDIT_RESPONSE_FAILURES("failures", true), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index 6c9fad642b4a..9a9bf8d5dfc5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -117,6 +117,10 @@ public static class ControllerPeriodicTasksConf { "controller.realtime.offsetAutoReset.backfill.frequencyPeriod"; public static final String REALTIME_OFFSET_AUTO_RESET_BACKFILL_INITIAL_DELAY_IN_SECONDS = "controller.realtime.offsetAutoReset.backfill.initialDelayInSeconds"; + public static final String MAX_CONCURRENT_BACKFILLS_PER_CONTROLLER = + "controller.realtime.offsetAutoReset.maxConcurrentBackfillsPerController"; + public static final String MAX_BACKFILL_COLLISIONS_BEFORE_AUTO_PAUSE = + "controller.realtime.offsetAutoReset.maxBackfillCollisionsBeforeAutoPause"; public static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_PERIOD = "controller.broker.resource.validation.frequencyPeriod"; public static final String BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS = @@ -1208,6 +1212,14 @@ public long getRealtimeOffsetAutoResetBackfillInitialDelaySeconds() { getPeriodicTaskInitialDelayInSeconds()); } + public int getMaxConcurrentBackfillsPerController() { + return getProperty(ControllerPeriodicTasksConf.MAX_CONCURRENT_BACKFILLS_PER_CONTROLLER, -1); + } + + public int getMaxBackfillCollisionsBeforeAutoPause() { + return getProperty(ControllerPeriodicTasksConf.MAX_BACKFILL_COLLISIONS_BEFORE_AUTO_PAUSE, 3); + } + public boolean isDeepStoreRetryUploadLLCSegmentEnabled() { return getProperty(ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, false); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index a81a9e161124..9ee44c4f8507 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1077,6 +1077,24 @@ private String computeStartOffset(String nextOffset, StreamConfig streamConfig, if (!streamConfig.isEnableOffsetAutoReset() || streamConfig.isBackfillTopic()) { return nextOffset; } + // Skip if backfill is manually paused for this topic + if (streamConfig.isOffsetAutoResetPaused()) { + LOGGER.info("Skipping offset auto reset for table {} topic {} — backfill is paused", + streamConfig.getTableNameWithType(), streamConfig.getTopicName()); + return nextOffset; + } + // Skip if the table already has too many segments (lightweight ZK child-name list, no data deserialization) + int maxSegments = streamConfig.getOffsetAutoResetMaxSegmentsBeforeSkip(); + if (maxSegments > 0) { + int segmentCount = ZKMetadataProvider.getSegments(_propertyStore, streamConfig.getTableNameWithType()).size(); + if (segmentCount >= maxSegments) { + LOGGER.info("Skipping offset auto reset for table {} topic {} — segment count {} >= maxSegmentsBeforeSkip {}", + streamConfig.getTableNameWithType(), streamConfig.getTopicName(), segmentCount, maxSegments); + _controllerMetrics.addMeteredTableValue(streamConfig.getTableNameWithType(), + ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_SKIPPED_MAX_SEGMENTS, 1L); + return nextOffset; + } + } long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold(); int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold(); if (timeThreshold <= 0 && offsetThreshold <= 0) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java index ab73ceab6b27..180eb6c67b94 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManager.java @@ -26,6 +26,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; @@ -37,6 +38,7 @@ import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; @@ -48,9 +50,13 @@ public class RealtimeOffsetAutoResetManager extends ControllerPeriodicTask _tableToHandler; - private final Map> _tableTopicsUnderBackfill; - private final Map> _tableBackfillTopics; + private final Map> _sourceTopicsByTable; + private final Map> _backfillTopicsByTable; + // Key: "tableNameWithType:topicName:partitionId" — counts how many times this partition triggered + // a backfill while a prior one for that same partition was still in flight. + private final Map _partitionsInFlightCount; public RealtimeOffsetAutoResetManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, @@ -60,9 +66,11 @@ public RealtimeOffsetAutoResetManager(ControllerConf config, PinotHelixResourceM leadControllerManager, controllerMetrics); _llcRealtimeSegmentManager = llcRealtimeSegmentManager; _pinotHelixResourceManager = pinotHelixResourceManager; + _controllerConf = config; _tableToHandler = new ConcurrentHashMap<>(); - _tableTopicsUnderBackfill = new ConcurrentHashMap<>(); - _tableBackfillTopics = new ConcurrentHashMap<>(); + _sourceTopicsByTable = new ConcurrentHashMap<>(); + _backfillTopicsByTable = new ConcurrentHashMap<>(); + _partitionsInFlightCount = new ConcurrentHashMap<>(); } @Override @@ -105,36 +113,78 @@ protected void processTable(String tableNameWithType, RealtimeOffsetAutoResetMan return; } + // Skip triggering if the controller is already handling the maximum number of concurrent backfills + int maxConcurrent = _controllerConf.getMaxConcurrentBackfillsPerController(); + if (context._shouldTriggerBackfillJobs && maxConcurrent > 0 + && _backfillTopicsByTable.size() >= maxConcurrent) { + LOGGER.warn("Skipping backfill trigger for table {} — max concurrent backfills ({}) reached", + tableNameWithType, maxConcurrent); + _controllerMetrics.addMeteredTableValue(tableNameWithType, + ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_SKIPPED_MAX_CONCURRENT, 1L); + context._shouldTriggerBackfillJobs = false; + } + if (context._shouldTriggerBackfillJobs) { - _tableTopicsUnderBackfill.putIfAbsent(tableNameWithType, ConcurrentHashMap.newKeySet()); + _sourceTopicsByTable.putIfAbsent(tableNameWithType, ConcurrentHashMap.newKeySet()); String topicName = context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_NAME); - _tableTopicsUnderBackfill.get(tableNameWithType).add(topicName); + String partitionStr = context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION); + String metricKey = topicName + "." + partitionStr; + _sourceTopicsByTable.get(tableNameWithType).add(topicName); + + // Per-partition in-flight guard: if this partition previously triggered a backfill that is + // still running, increment its collision counter. Auto-pause when the threshold is exceeded. + String partitionKey = tableNameWithType + ":" + topicName + ":" + partitionStr; + if (_partitionsInFlightCount.containsKey(partitionKey)) { + int collisions = _partitionsInFlightCount.merge(partitionKey, 1, Integer::sum); + int maxCollisions = _controllerConf.getMaxBackfillCollisionsBeforeAutoPause(); + _controllerMetrics.addMeteredTableValue(tableNameWithType, metricKey, + ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_SKIPPED_IN_FLIGHT, 1L); + LOGGER.warn("In-flight backfill collision #{} for partition key {}", collisions, partitionKey); + if (maxCollisions > 0 && collisions >= maxCollisions) { + LOGGER.warn("Auto-pausing backfill for table {} topic {} partition {} after {} collisions", + tableNameWithType, topicName, partitionStr, collisions); + _controllerMetrics.addMeteredTableValue(tableNameWithType, metricKey, + ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_SKIPPED_PAUSED, 1L); + setPauseFlag(tableNameWithType, tableConfig, topicName); + context._shouldTriggerBackfillJobs = false; + } + // Below threshold: allow trigger to proceed (new backfill coexists with the ongoing one) + } StreamConfig topicStreamConfig = IngestionConfigUtils.getStreamConfigs(tableConfig).stream() .filter(config -> topicName.equals(config.getTopicName())) .findFirst().orElseThrow(() -> new RuntimeException("No matching topic found")); - LOGGER.info("Triggering backfill jobs with StreamConfig {}, topicName {}, properties {}", - topicStreamConfig, topicName, context._backfillJobProperties); - try { - long startOffset = Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM)); - long endOffset = Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO)); - if (_tableToHandler.get(tableNameWithType).triggerBackfillJob(tableNameWithType, - topicStreamConfig, - topicName, - Integer.parseInt(context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION)), - startOffset, - endOffset)) { - _controllerMetrics.addMeteredTableValue(tableNameWithType, ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_OFFSETS, - endOffset - startOffset); + if (context._shouldTriggerBackfillJobs) { + LOGGER.info("Triggering backfill jobs with StreamConfig {}, topicName {}, properties {}", + topicStreamConfig, topicName, context._backfillJobProperties); + try { + long startOffset = Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM)); + long endOffset = Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO)); + if (_tableToHandler.get(tableNameWithType).triggerBackfillJob(tableNameWithType, + topicStreamConfig, + topicName, + Integer.parseInt(partitionStr), + startOffset, + endOffset)) { + // Mark this partition as having an in-flight backfill (collision count starts at 1) + _partitionsInFlightCount.put(partitionKey, 1); + _controllerMetrics.addMeteredTableValue(tableNameWithType, metricKey, + ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_OFFSETS, endOffset - startOffset); + } + } catch (NumberFormatException e) { + LOGGER.error("Invalid backfill job properties for table: {}, properties: {}, error: {}", + tableNameWithType, context._backfillJobProperties, e.getMessage(), e); } - } catch (NumberFormatException e) { - LOGGER.error("Invalid backfill job properties for table: {}, properties: {}, error: {}", - tableNameWithType, context._backfillJobProperties, e.getMessage(), e); } } ensureBackfillJobsRunning(tableNameWithType); ensureCompletedBackfillJobsCleanedUp(tableConfig); + + Set activeTopics = _backfillTopicsByTable.get(tableNameWithType); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, + ControllerGauge.BACKFILL_TOPICS_IN_PROGRESS, + activeTopics != null ? activeTopics.size() : 0L); } /** @@ -148,51 +198,79 @@ private void ensureBackfillJobsRunning(String tableNameWithType) { for (int i = 0; i < streamConfigs.size(); i++) { if (streamConfigs.get(i).isBackfillTopic() && !_llcRealtimeSegmentManager.isTopicConsumptionPaused( tableConfig.getTableName(), i)) { - _tableBackfillTopics.putIfAbsent(tableNameWithType, ConcurrentHashMap.newKeySet()); - _tableBackfillTopics.get(tableNameWithType).add(streamConfigs.get(i).getTopicName()); + _backfillTopicsByTable.putIfAbsent(tableNameWithType, ConcurrentHashMap.newKeySet()); + _backfillTopicsByTable.get(tableNameWithType).add(streamConfigs.get(i).getTopicName()); } } - if (!_tableTopicsUnderBackfill.containsKey(tableNameWithType) - || _tableTopicsUnderBackfill.get(tableNameWithType).isEmpty()) { + if (!_sourceTopicsByTable.containsKey(tableNameWithType) + || _sourceTopicsByTable.get(tableNameWithType).isEmpty()) { return; } RealtimeOffsetAutoResetHandler handler = getOrConstructHandler(tableConfig); if (handler == null) { return; } - handler.ensureBackfillJobsRunning(tableNameWithType, _tableTopicsUnderBackfill.get(tableNameWithType)); + handler.ensureBackfillJobsRunning(tableNameWithType, _sourceTopicsByTable.get(tableNameWithType)); } private void ensureCompletedBackfillJobsCleanedUp(TableConfig tableConfig) { String tableNameWithType = tableConfig.getTableName(); - if (!_tableBackfillTopics.containsKey(tableNameWithType)) { + if (!_backfillTopicsByTable.containsKey(tableNameWithType)) { return; } LOGGER.info("Trying to clean up backfill jobs on {}", tableNameWithType); RealtimeOffsetAutoResetHandler handler = getOrConstructHandler(tableConfig); Collection cleanedUpTopics = handler.cleanupCompletedBackfillJobs( - tableNameWithType, _tableBackfillTopics.get(tableNameWithType)); - if (cleanedUpTopics.containsAll(_tableBackfillTopics.get(tableNameWithType))) { - _tableTopicsUnderBackfill.remove(tableNameWithType); - _tableBackfillTopics.remove(tableNameWithType); + tableNameWithType, _backfillTopicsByTable.get(tableNameWithType)); + if (cleanedUpTopics.containsAll(_backfillTopicsByTable.get(tableNameWithType))) { + _sourceTopicsByTable.remove(tableNameWithType); + _backfillTopicsByTable.remove(tableNameWithType); + // Remove all per-partition collision counters for this table + _partitionsInFlightCount.keySet().removeIf(k -> k.startsWith(tableNameWithType + ":")); if (_tableToHandler.get(tableNameWithType) != null) { _tableToHandler.get(tableNameWithType).close(); _tableToHandler.remove(tableNameWithType); } } else { - _tableBackfillTopics.get(tableNameWithType).removeAll(cleanedUpTopics); + _backfillTopicsByTable.get(tableNameWithType).removeAll(cleanedUpTopics); } if (cleanedUpTopics.size() > 0) { LOGGER.info("Cleaned up complete backfill topics {} for table {}", cleanedUpTopics, tableNameWithType); + _controllerMetrics.addMeteredTableValue(tableNameWithType, + ControllerMeter.OFFSET_AUTO_RESET_BACKFILL_CLEANUP_COMPLETED, cleanedUpTopics.size()); } } @Override protected void nonLeaderCleanup(List tableNamesWithType) { for (String tableNameWithType : tableNamesWithType) { - _tableTopicsUnderBackfill.remove(tableNameWithType); - _tableBackfillTopics.remove(tableNameWithType); + _sourceTopicsByTable.remove(tableNameWithType); + _backfillTopicsByTable.remove(tableNameWithType); _tableToHandler.remove(tableNameWithType); + _partitionsInFlightCount.keySet().removeIf(k -> k.startsWith(tableNameWithType + ":")); + } + } + + private void setPauseFlag(String tableNameWithType, TableConfig tableConfig, String topicName) { + List> streamConfigMaps = + tableConfig.getIngestionConfig().getStreamIngestionConfig().getStreamConfigMaps(); + for (Map map : streamConfigMaps) { + // Topic name is stored under the prefixed key "stream..topic.name" + String streamType = map.get(StreamConfigProperties.STREAM_TYPE); + String topicKey = + StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME); + if (topicName.equals(map.get(topicKey))) { + map.put(StreamConfigProperties.OFFSET_AUTO_RESET_PAUSE, "true"); + break; + } + } + try { + _pinotHelixResourceManager.updateTableConfig(tableConfig); + LOGGER.info("Set offset auto reset pause flag for table {} topic {}", tableNameWithType, topicName); + } catch (Exception e) { + LOGGER.error("Failed to set pause flag for table {} topic {}", tableNameWithType, topicName, e); + _controllerMetrics.addMeteredTableValue(tableNameWithType, + ControllerMeter.OFFSET_AUTO_RESET_AUTO_PAUSE_FAILURE, 1L); } } @@ -235,6 +313,8 @@ private RealtimeOffsetAutoResetHandler getOrConstructHandler(TableConfig tableCo return handler; } catch (Exception e) { LOGGER.error("Cannot create RealtimeOffsetAutoResetHandler", e); + _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), + ControllerMeter.OFFSET_AUTO_RESET_HANDLER_INIT_FAILURE, 1L); return null; } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManagerTest.java index d46d96d7f734..c5c9cdd38367 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeOffsetAutoResetManagerTest.java @@ -37,6 +37,7 @@ import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -46,6 +47,7 @@ import org.testng.annotations.Test; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -252,6 +254,120 @@ public void testGetOrConstructHandlerWithExistingHandler() { // We can verify this by checking that the process completes without exception } + // ---- Circuit Breaker Tests ---- + + @Test + public void testMaxConcurrentBackfillsSkipsNewTrigger() { + // Set max concurrent to 1 + _controllerConf.setProperty( + ControllerConf.ControllerPeriodicTasksConf.MAX_CONCURRENT_BACKFILLS_PER_CONTROLLER, "1"); + + String secondTableName = "anotherTable_REALTIME"; + TableConfig secondTableConfig = createTableConfigWithValidHandlerClass(secondTableName, "secondTopic"); + when(_pinotHelixResourceManager.getTableConfig(secondTableName)).thenReturn(secondTableConfig); + + // Trigger a backfill on the second table so _backfillTopicsByTable gets a "secondTopic" entry + // The TestRealtimeOffsetAutoResetHandler.cleanupCompletedBackfillJobs returns empty list, + // so ensureCompletedBackfillJobsCleanedUp will not clean it up. + // We need the handler to report active backfill topics via ensureBackfillJobsRunning. + // Simplest: configure the stream config of the second table with isBackfillTopic=true + // so ensureBackfillJobsRunning picks it up as an active backfill topic. + Map backfillStreamMap = new HashMap<>(); + backfillStreamMap.put("streamType", "kafka"); + backfillStreamMap.put("stream.kafka.topic.name", "secondTopic-backfill"); + backfillStreamMap.put("stream.kafka.consumer.type", "simple"); + backfillStreamMap.put("realtime.segment.isBackfillTopic", "true"); + backfillStreamMap.put("stream.kafka.decoder.class.name", "testDecoder"); + IngestionConfig ingestionConfig = secondTableConfig.getIngestionConfig(); + List> streamMaps = new java.util.ArrayList<>( + ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps()); + streamMaps.add(backfillStreamMap); + ingestionConfig.setStreamIngestionConfig( + new StreamIngestionConfig(streamMaps)); + ingestionConfig.getStreamIngestionConfig().setRealtimeOffsetAutoResetHandlerClass(TEST_HANDLER_CLASS_NAME); + when(_llcRealtimeSegmentManager.isTopicConsumptionPaused(secondTableName, 1)).thenReturn(false); + + // Run processTable on second table with no trigger to populate _backfillTopicsByTable via ensureBackfillJobsRunning + RealtimeOffsetAutoResetManager.Context noTriggerCtx = _realtimeOffsetAutoResetManager.preprocess(new Properties()); + _realtimeOffsetAutoResetManager.processTable(secondTableName, noTriggerCtx); + + // Now try to trigger backfill on the first table — should be blocked (1 table already backfilling) + TableConfig firstTableConfig = createTableConfigWithValidHandlerClass(); + when(_pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(firstTableConfig); + RealtimeOffsetAutoResetManager.Context firstCtx = _realtimeOffsetAutoResetManager.preprocess(_properties); + _realtimeOffsetAutoResetManager.processTable(REALTIME_TABLE_NAME, firstCtx); + + RealtimeOffsetAutoResetHandler handler = _realtimeOffsetAutoResetManager.getTableHandler(REALTIME_TABLE_NAME); + Assert.assertNotNull(handler); + Assert.assertFalse(((TestRealtimeOffsetAutoResetHandler) handler)._triggedBackfillJob, + "backfill should be skipped when max concurrent limit is reached"); + } + + @Test + public void testInFlightCollisionAtThresholdAutopauses() { + // threshold=1: on the 2nd trigger for the same partition, auto-pause fires + _controllerConf.setProperty( + ControllerConf.ControllerPeriodicTasksConf.MAX_BACKFILL_COLLISIONS_BEFORE_AUTO_PAUSE, "1"); + + Map mainStreamMap = new HashMap<>(); + mainStreamMap.put("streamType", "kafka"); + mainStreamMap.put("stream.kafka.topic.name", TOPIC_NAME); + mainStreamMap.put("stream.kafka.consumer.type", "simple"); + mainStreamMap.put("realtime.segment.offsetAutoReset.timeSecThreshold", "1800"); + mainStreamMap.put("stream.kafka.decoder.class.name", "testDecoder"); + + List> maps = Collections.singletonList(mainStreamMap); + IngestionConfig ingestionConfig = new IngestionConfig(); + StreamIngestionConfig streamIngestionConfig = new StreamIngestionConfig(maps); + streamIngestionConfig.setRealtimeOffsetAutoResetHandlerClass(TEST_HANDLER_CLASS_NAME); + ingestionConfig.setStreamIngestionConfig(streamIngestionConfig); + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(REALTIME_TABLE_NAME).build(); + tableConfig.setIngestionConfig(ingestionConfig); + + when(_pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); + try { + doNothing().when(_pinotHelixResourceManager).updateTableConfig(tableConfig); + } catch (Exception ignored) { + } + + // First trigger — succeeds, sets _partitionsInFlightCount[partition]=1 + RealtimeOffsetAutoResetManager.Context firstCtx = _realtimeOffsetAutoResetManager.preprocess(_properties); + _realtimeOffsetAutoResetManager.processTable(REALTIME_TABLE_NAME, firstCtx); + + TestRealtimeOffsetAutoResetHandler handler = + (TestRealtimeOffsetAutoResetHandler) _realtimeOffsetAutoResetManager.getTableHandler(REALTIME_TABLE_NAME); + Assert.assertNotNull(handler); + Assert.assertTrue(handler._triggedBackfillJob, "first trigger should succeed"); + + // Reset flag to detect whether second trigger fires + handler._triggedBackfillJob = false; + + // Second trigger for the same partition — collision #1 >= threshold=1 → auto-pause, skip trigger + RealtimeOffsetAutoResetManager.Context secondCtx = _realtimeOffsetAutoResetManager.preprocess(_properties); + _realtimeOffsetAutoResetManager.processTable(REALTIME_TABLE_NAME, secondCtx); + + Assert.assertFalse(handler._triggedBackfillJob, + "collision at threshold should skip the backfill trigger"); + Assert.assertEquals(mainStreamMap.get(StreamConfigProperties.OFFSET_AUTO_RESET_PAUSE), "true", + "pause flag should be set on the main topic stream config"); + } + + private TableConfig createTableConfigWithValidHandlerClass(String tableName, String topicName) { + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).build(); + IngestionConfig ingestionConfig = new IngestionConfig(); + Map streamConfigMap = new HashMap<>(); + streamConfigMap.put("streamType", "kafka"); + streamConfigMap.put("stream.kafka.topic.name", topicName); + streamConfigMap.put("stream.kafka.consumer.type", "simple"); + streamConfigMap.put("realtime.segment.offsetAutoReset.timeSecThreshold", "1800"); + streamConfigMap.put("stream.kafka.decoder.class.name", "testDecoder"); + StreamIngestionConfig streamIngestionConfig = new StreamIngestionConfig(Collections.singletonList(streamConfigMap)); + streamIngestionConfig.setRealtimeOffsetAutoResetHandlerClass(TEST_HANDLER_CLASS_NAME); + ingestionConfig.setStreamIngestionConfig(streamIngestionConfig); + tableConfig.setIngestionConfig(ingestionConfig); + return tableConfig; + } + private TableConfig createTableConfigWithoutHandlerClass() { TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(REALTIME_TABLE_NAME).build(); IngestionConfig ingestionConfig = new IngestionConfig(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java index 7f502fd23f3d..7763917f3fe1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java @@ -438,6 +438,22 @@ public boolean isBackfillTopic() { return Boolean.TRUE.equals(_backfillTopic); } + public boolean isOffsetAutoResetPaused() { + return Boolean.parseBoolean(_streamConfigMap.get(StreamConfigProperties.OFFSET_AUTO_RESET_PAUSE)); + } + + public int getOffsetAutoResetMaxSegmentsBeforeSkip() { + String val = _streamConfigMap.get(StreamConfigProperties.OFFSET_AUTO_RESET_MAX_SEGMENTS_BEFORE_SKIP); + if (val == null) { + return -1; + } + try { + return Integer.parseInt(val); + } catch (NumberFormatException e) { + return -1; + } + } + public String getTableNameWithType() { return _tableNameWithType; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java index aa00ceb8870a..960c3519aa39 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java @@ -163,6 +163,19 @@ private StreamConfigProperties() { */ public static final String BACKFILL_TOPIC = "realtime.segment.isBackfillTopic"; + /** + * When true, skip triggering offset auto-reset backfill for this stream/topic. + * Can be set per stream config entry (per topic). Cleared by the operator via normal table config update. + */ + public static final String OFFSET_AUTO_RESET_PAUSE = "realtime.segment.offsetAutoReset.pause"; + + /** + * If the segment count for the table >= this value, skip triggering a backfill. + * -1 or absent means disabled. Prevents znode limit pressure when ingestion is permanently high. + */ + public static final String OFFSET_AUTO_RESET_MAX_SEGMENTS_BEFORE_SKIP = + "realtime.segment.offsetAutoReset.maxSegmentsBeforeBackfillSkip"; + /** * Helper method to create a stream specific property */