diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index ab20f6fb7453..96fca237aac1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -474,8 +474,10 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon if (tableType == TableType.REALTIME && tableConfig != null) { List streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig); + boolean multitopicSkipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig); + new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics, - streamConfigs, idealState).findAndEmitMetrics(idealState); + streamConfigs, idealState, multitopicSkipMissingTopics).findAndEmitMetrics(idealState); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 6ec48830ca71..cb0ca4b91ff4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -89,12 +89,15 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n * and is created using the latest segment zk metadata. * @param pausedTopicIndices List of inactive topic indices. Index is the index of the topic in the streamConfigMaps. * @param forceGetOffsetFromStream - details in PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset + * @param multitopicSkipMissingTopics - In multitopic tables whether to skip topics that don't exist + * during partition metadata fetch */ public static List getPartitionGroupMetadataList(List streamConfigs, List partitionGroupConsumptionStatusList, List pausedTopicIndices, - boolean forceGetOffsetFromStream) { + boolean forceGetOffsetFromStream, boolean multitopicSkipMissingTopics) { PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = new PartitionGroupMetadataFetcher( - streamConfigs, partitionGroupConsumptionStatusList, pausedTopicIndices, forceGetOffsetFromStream); + streamConfigs, partitionGroupConsumptionStatusList, pausedTopicIndices, forceGetOffsetFromStream, + multitopicSkipMissingTopics); try { DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index 99bee6f8a7fe..705eb7f05367 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -69,7 +69,8 @@ public class MissingConsumingSegmentFinder { private ControllerMetrics _controllerMetrics; public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore propertyStore, - ControllerMetrics controllerMetrics, List streamConfigs, IdealState idealState) { + ControllerMetrics controllerMetrics, List streamConfigs, IdealState idealState, + boolean multitopicSkipMissingTopics) { _realtimeTableName = realtimeTableName; _controllerMetrics = controllerMetrics; _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics); @@ -85,7 +86,8 @@ public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertySt try { PauseState pauseState = PinotLLCRealtimeSegmentManager.extractTablePauseState(idealState); PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), - pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), false) + pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), false, + multitopicSkipMissingTopics) .forEach(metadata -> { _partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); }); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 55252182dd6a..a3bc1683db72 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -378,8 +378,10 @@ public void stop() { */ public void setUpNewTable(TableConfig tableConfig, IdealState idealState) { List streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig); + boolean multitopicSkipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig); List> newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), idealState).stream().map( + getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), idealState, + multitopicSkipMissingTopics).stream().map( x -> Pair.of(x, STARTING_SEQUENCE_NUMBER) ).collect(Collectors.toList()); setUpNewTable(tableConfig, idealState, newPartitionGroupMetadataList); @@ -811,7 +813,8 @@ private String createNewSegmentMetadata(TableConfig tableConfig, IdealState idea int committingSegmentPartitionGroupId = committingLLCSegment.getPartitionGroupId(); List streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig); - Set partitionIds = getPartitionIds(streamConfigs, idealState); + boolean multitopicSkipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig); + Set partitionIds = getPartitionIds(streamConfigs, idealState, multitopicSkipMissingTopics); if (partitionIds.contains(committingSegmentPartitionGroupId)) { String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); @@ -1136,7 +1139,8 @@ Set getPartitionIds(StreamConfig streamConfig) } @VisibleForTesting - Set getPartitionIds(List streamConfigs, IdealState idealState) { + Set getPartitionIds(List streamConfigs, IdealState idealState, + boolean multitopicSkipMissingTopics) { Set partitionIds = new HashSet<>(); boolean allPartitionIdsFetched = true; int numStreams = streamConfigs.size(); @@ -1184,7 +1188,8 @@ Set getPartitionIds(List streamConfigs, IdealState idealS List currentPartitionGroupConsumptionStatusList = getPartitionGroupConsumptionStatusList(idealState, streamConfigs); List newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState); + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, + multitopicSkipMissingTopics); partitionIds.addAll(newPartitionGroupMetadataList.stream() .map(PartitionGroupMetadata::getPartitionGroupId) .collect(Collectors.toSet())); @@ -1199,9 +1204,10 @@ Set getPartitionIds(List streamConfigs, IdealState idealS */ @VisibleForTesting List getNewPartitionGroupMetadataList(List streamConfigs, - List currentPartitionGroupConsumptionStatusList, IdealState idealState) { + List currentPartitionGroupConsumptionStatusList, IdealState idealState, + boolean multitopicSkipMissingTopics) { return getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, - false); + false, multitopicSkipMissingTopics); } /** @@ -1212,11 +1218,12 @@ List getNewPartitionGroupMetadataList(List @VisibleForTesting List getNewPartitionGroupMetadataList(List streamConfigs, List currentPartitionGroupConsumptionStatusList, IdealState idealState, - boolean forceGetOffsetFromStream) { + boolean forceGetOffsetFromStream, boolean multitopicSkipMissingTopics) { PauseState pauseState = extractTablePauseState(idealState); return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, - pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), forceGetOffsetFromStream); + pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), forceGetOffsetFromStream, + multitopicSkipMissingTopics); } /** @@ -1388,8 +1395,11 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, List streamConfig.setOffsetCriteria( offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA)); + boolean multitopicSkipMissingTopics = + IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig); List newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState); + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, + multitopicSkipMissingTopics); streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(originalOffsetCriteria)); return ensureAllPartitionsConsuming(tableConfig, streamConfigs, idealState, newPartitionGroupMetadataList, offsetCriteria); @@ -1754,10 +1764,11 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List fetchPartitionGroupIdToSmallestOffset(List streamConfigs, - IdealState idealState) { + IdealState idealState, boolean multitopicSkipMissingTopics) { Map partitionGroupIdToSmallestOffset = new HashMap<>(); for (StreamConfig streamConfig : streamConfigs) { List currentPartitionGroupConsumptionStatusList = @@ -1875,7 +1886,7 @@ private Map fetchPartitionGroupIdToSmallestOf // The kafka implementation of computePartitionGroupMetadata() will ignore the current status // while the kinesis implementation will use it. List partitionGroupMetadataList = getNewPartitionGroupMetadataList( - streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, true); + streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, true, multitopicSkipMissingTopics); streamConfig.setOffsetCriteria(originalOffsetCriteria); for (PartitionGroupMetadata metadata : partitionGroupMetadataList) { partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 94f3abba16e9..90cfe228a17a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -299,7 +299,7 @@ public void testCommitSegment() { // committing segment's partitionGroupId no longer in the newPartitionGroupMetadataList List partitionGroupMetadataListWithout0 = segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList(), - mock(IdealState.class)); + mock(IdealState.class), false); partitionGroupMetadataListWithout0.remove(0); segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout0; @@ -853,7 +853,7 @@ public void testRepairs() { // 1 reached end of shard. List partitionGroupMetadataListWithout1 = segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList(), - mock(IdealState.class)); + mock(IdealState.class), false); partitionGroupMetadataListWithout1.remove(1); segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout1; // noop @@ -1622,7 +1622,7 @@ public void testGetPartitionIds() segmentManager._numPartitions = 2; // Test empty ideal state - Set partitionIds = segmentManager.getPartitionIds(streamConfigs, idealState); + Set partitionIds = segmentManager.getPartitionIds(streamConfigs, idealState, false); Assert.assertEquals(partitionIds.size(), 2); partitionIds.clear(); @@ -1638,8 +1638,8 @@ public void testGetPartitionIds() List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)), new PartitionGroupMetadata(1, new LongMsgOffset(345))); doReturn(partitionGroupMetadataList).when(segmentManagerSpy) - .getNewPartitionGroupMetadataList(streamConfigs, partitionGroupConsumptionStatusList, idealState); - partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, idealState); + .getNewPartitionGroupMetadataList(streamConfigs, partitionGroupConsumptionStatusList, idealState, false); + partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, idealState, false); Assert.assertEquals(partitionIds.size(), 2); } @@ -2024,7 +2024,8 @@ public void setUpNewTable() { public void ensureAllPartitionsConsuming() { ensureAllPartitionsConsuming(_tableConfig, _streamConfigs, _idealState, - getNewPartitionGroupMetadataList(_streamConfigs, Collections.emptyList(), mock(IdealState.class)), null); + getNewPartitionGroupMetadataList(_streamConfigs, Collections.emptyList(), mock(IdealState.class), false), + null); } @Override @@ -2106,7 +2107,8 @@ Set getPartitionIds(StreamConfig streamConfig) { @Override List getNewPartitionGroupMetadataList(List streamConfigs, - List currentPartitionGroupConsumptionStatusList, IdealState idealState) { + List currentPartitionGroupConsumptionStatusList, IdealState idealState, + boolean multitopicSkipMissingTopics) { if (_partitionGroupMetadataList != null) { return _partitionGroupMetadataList; } else { @@ -2118,8 +2120,9 @@ List getNewPartitionGroupMetadataList(List @Override List getNewPartitionGroupMetadataList(List streamConfigs, List currentPartitionGroupConsumptionStatusList, IdealState idealState, - boolean forceGetOffsetFromStream) { - return getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState); + boolean forceGetOffsetFromStream, boolean multitopicSkipMissingTopics) { + return getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, + multitopicSkipMissingTopics); } @Override diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java index dda082c80d29..334a055d213f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java @@ -62,6 +62,10 @@ public class StreamIngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Class to handle realtime offset auto reset") private String _realtimeOffsetAutoResetHandlerClass; + @JsonPropertyDescription("Multitopic Tables: If true, non-existent topics will be skipped instead of " + + "causing failures.") + private boolean _multitopicSkipMissingTables; + @JsonCreator public StreamIngestionConfig(@JsonProperty("streamConfigMaps") List> streamConfigMaps) { _streamConfigMaps = streamConfigMaps; @@ -136,4 +140,12 @@ public String getRealtimeOffsetAutoResetHandlerClass() { public void setRealtimeOffsetAutoResetHandlerClass(String realtimeOffsetAutoResetHandlerClass) { _realtimeOffsetAutoResetHandlerClass = realtimeOffsetAutoResetHandlerClass; } + + public boolean isMultitopicSkipMissingTables() { + return _multitopicSkipMissingTables; + } + + public void setMultitopicSkipMissingTables(boolean multitopicSkipMissingTables) { + _multitopicSkipMissingTables = multitopicSkipMissingTables; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 698ad472e1a1..b502510a26c5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.stream.Collectors; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -37,6 +38,7 @@ public class PartitionGroupMetadataFetcher implements Callable { private final List _streamConfigs; private final List _partitionGroupConsumptionStatusList; private final boolean _forceGetOffsetFromStream; + private final boolean _multitopicSkipMissingTopics; private final List _newPartitionGroupMetadataList = new ArrayList<>(); private final List _pausedTopicIndices; @@ -44,10 +46,11 @@ public class PartitionGroupMetadataFetcher implements Callable { public PartitionGroupMetadataFetcher(List streamConfigs, List partitionGroupConsumptionStatusList, List pausedTopicIndices, - boolean forceGetOffsetFromStream) { + boolean forceGetOffsetFromStream, boolean multitopicSkipMissingTopics) { _streamConfigs = streamConfigs; _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList; _forceGetOffsetFromStream = forceGetOffsetFromStream; + _multitopicSkipMissingTopics = multitopicSkipMissingTopics; _pausedTopicIndices = pausedTopicIndices; } @@ -102,6 +105,11 @@ private Boolean fetchSingleStream() private Boolean fetchMultipleStreams() throws Exception { int numStreams = _streamConfigs.size(); + + // For multi topic tables - Fetch available topics once and reuse across all streams + // (for topic existence validation) + Set availableTopicNames = fetchAvailableTopicNames(); + for (int i = 0; i < numStreams; i++) { if (_pausedTopicIndices.contains(i)) { LOGGER.info("Skipping fetching PartitionGroupMetadata for paused topic: {}", @@ -122,6 +130,14 @@ private Boolean fetchMultipleStreams() .collect(Collectors.toList()); try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider( StreamConsumerFactory.getUniqueClientId(clientId))) { + + // Check if the topic exists before fetching partition metadata + // Only perform this check if topic existence validation is enabled and topics were fetched + if (_multitopicSkipMissingTopics && availableTopicNames != null && !availableTopicNames.contains(topicName)) { + LOGGER.warn("Topic {} does not exist. Skipping this topic from ingestion.", topicName); + continue; + } + _newPartitionGroupMetadataList.addAll( streamMetadataProvider.computePartitionGroupMetadata(clientId, streamConfig, topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000, @@ -147,4 +163,34 @@ private Boolean fetchMultipleStreams() } return Boolean.TRUE; } + + /** + * Fetches available topic names from the stream provider. + * Uses the first stream config to fetch topics. + * + * @return Set of available topic names, or null if topics could not be fetched or skip missing topics is disabled + */ + private Set fetchAvailableTopicNames() { + if (!_multitopicSkipMissingTopics || _streamConfigs.isEmpty()) { + return null; + } + + StreamConfig streamConfigForTopicFetch = _streamConfigs.get(0); + + String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-topicFetch-" + + streamConfigForTopicFetch.getTableNameWithType(); + StreamConsumerFactory factory = StreamConsumerFactoryProvider.create(streamConfigForTopicFetch); + + try (StreamMetadataProvider provider = factory.createStreamMetadataProvider(clientId)) { + return provider.getTopics().stream() + .map(StreamMetadataProvider.TopicMetadata::getName) + .collect(Collectors.toSet()); + } catch (UnsupportedOperationException e) { + LOGGER.debug("getTopics() not supported for stream type, skipping topic existence validation"); + return null; + } catch (Exception e) { + LOGGER.warn("Failed to fetch available topics, skipping topic existence validation", e); + return null; + } + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java index 762720ab59bc..f823a62dc4d3 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java @@ -48,6 +48,7 @@ private StreamConfigProperties() { public static final String GROUP_ID = "hlc.group.id"; public static final String PARTITION_MSG_OFFSET_FACTORY_CLASS = "partition.offset.factory.class.name"; public static final String TOPIC_CONSUMPTION_RATE_LIMIT = "topic.consumption.rate.limit"; + public static final String MULTITOPIC_SKIP_MISSING_TOPICS = "multitopic.skip.missing.topics"; public static final String METADATA_POPULATE = "metadata.populate"; /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index 40f647c429cd..f4f45144a94d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -351,4 +351,17 @@ public static Map> getStreamConfigIndexToStreamPartitions( } return streamIndexToPartitions; } + + /** + * Returns whether to skip missing topics during partition metadata fetch. + * @param tableConfig the table config + * @return true if missing topics should be skipped, false otherwise + */ + public static boolean getMultitopicSkipMissingTablesFlag(TableConfig tableConfig) { + IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); + if (ingestionConfig != null && ingestionConfig.getStreamIngestionConfig() != null) { + return ingestionConfig.getStreamIngestionConfig().isMultitopicSkipMissingTables(); + } + return false; + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java index 9fa65254b636..8ac9e8c85f0c 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java @@ -20,7 +20,9 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -63,7 +65,7 @@ public void testFetchSingleStreamSuccess() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList(), false); + streamConfigs, statusList, Collections.emptyList(), false, false); // Execute Boolean result = fetcher.call(); @@ -97,7 +99,7 @@ public void testFetchSingleStreamTransientException() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList(), false); + streamConfigs, statusList, Collections.emptyList(), false, false); // Execute Boolean result = fetcher.call(); @@ -124,6 +126,8 @@ public void testFetchMultipleStreams() PartitionGroupMetadata mockedMetadata2 = new PartitionGroupMetadata(1, mock(StreamPartitionMsgOffset.class)); StreamMetadataProvider metadataProvider = mock(StreamMetadataProvider.class); + // Mock getTopics() to throw UnsupportedOperationException (bypasses topic existence check) + when(metadataProvider.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); when(metadataProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), any(List.class), anyInt(), anyBoolean())) .thenReturn(Arrays.asList(mockedMetadata1, mockedMetadata2)); @@ -136,7 +140,7 @@ public void testFetchMultipleStreams() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList(), false); + streamConfigs, statusList, Collections.emptyList(), false, false); // Execute Boolean result = fetcher.call(); @@ -174,6 +178,8 @@ public void testFetchMultipleStreamsWithPause() PartitionGroupMetadata mockedMetadata2 = new PartitionGroupMetadata(1, mock(StreamPartitionMsgOffset.class)); StreamMetadataProvider metadataProvider = mock(StreamMetadataProvider.class); + // Mock getTopics() to throw UnsupportedOperationException (bypasses topic existence check) + when(metadataProvider.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); when(metadataProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), any(List.class), anyInt(), anyBoolean())) .thenReturn(Arrays.asList(mockedMetadata1, mockedMetadata2)); @@ -186,7 +192,7 @@ public void testFetchMultipleStreamsWithPause() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Arrays.asList(1), false); + streamConfigs, statusList, Arrays.asList(1), false, false); // Execute Boolean result = fetcher.call(); @@ -207,10 +213,300 @@ public void testFetchMultipleStreamsWithPause() } } - private StreamConfig createMockStreamConfig(String topicName, String tableName, boolean isEphemeral) { + @Test + public void testFetchMultipleStreamsWithExceptionThrows() + throws Exception { + // Setup: 3 streams where the second one throws an exception + StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", false); + StreamConfig streamConfig2 = createMockStreamConfig("topic2-failing", "test-table", false); + StreamConfig streamConfig3 = createMockStreamConfig("topic3", "test-table", false); + List streamConfigs = Arrays.asList(streamConfig1, streamConfig2, streamConfig3); + + PartitionGroupConsumptionStatus status1 = new PartitionGroupConsumptionStatus(0, 0, null, null, "IN_PROGRESS"); + List statusList = Collections.singletonList(status1); + + PartitionGroupMetadata mockedMetadata = new PartitionGroupMetadata(0, mock(StreamPartitionMsgOffset.class)); + + // Create separate metadata providers for each stream + StreamMetadataProvider successProvider1 = mock(StreamMetadataProvider.class); + when(successProvider1.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); + when(successProvider1.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt(), anyBoolean())) + .thenReturn(Collections.singletonList(mockedMetadata)); + + StreamMetadataProvider failingProvider = mock(StreamMetadataProvider.class); + when(failingProvider.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); + when(failingProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt(), anyBoolean())) + .thenThrow(new RuntimeException("Failed to fetch partition count for topic2-failing")); + + StreamMetadataProvider successProvider3 = mock(StreamMetadataProvider.class); + when(successProvider3.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); + when(successProvider3.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt(), anyBoolean())) + .thenReturn(Collections.singletonList(mockedMetadata)); + + StreamConsumerFactory factory1 = mock(StreamConsumerFactory.class); + when(factory1.createStreamMetadataProvider(anyString())).thenReturn(successProvider1); + + StreamConsumerFactory factory2 = mock(StreamConsumerFactory.class); + when(factory2.createStreamMetadataProvider(anyString())).thenReturn(failingProvider); + + StreamConsumerFactory factory3 = mock(StreamConsumerFactory.class); + when(factory3.createStreamMetadataProvider(anyString())).thenReturn(successProvider3); + + try (MockedStatic mockedProvider = Mockito.mockStatic( + StreamConsumerFactoryProvider.class)) { + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig1)).thenReturn(factory1); + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig2)).thenReturn(factory2); + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig3)).thenReturn(factory3); + + PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( + streamConfigs, statusList, Collections.emptyList(), false, false); + + // Execute and verify exception is thrown + try { + fetcher.call(); + Assert.fail("Expected RuntimeException to be thrown"); + } catch (RuntimeException e) { + // Verify: exception should contain topic2-failing + Assert.assertTrue(e.getMessage().contains("topic2-failing")); + // Verify: exception should also be recorded + Assert.assertNotNull(fetcher.getException()); + Assert.assertEquals(e, fetcher.getException()); + } + + // Verify: only metadata from topic1 should be collected (before exception) + List resultMetadata = fetcher.getPartitionGroupMetadataList(); + Assert.assertEquals(resultMetadata.size(), 1); + Assert.assertEquals(resultMetadata.get(0).getPartitionGroupId(), 0); + } + } + + @Test + public void testFetchMultipleStreamsTransientExceptionStopsProcessing() + throws Exception { + // Setup: TransientConsumerException should cause immediate return with FALSE + StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", false); + StreamConfig streamConfig2 = createMockStreamConfig("topic2", "test-table", false); + List streamConfigs = Arrays.asList(streamConfig1, streamConfig2); + + List statusList = Collections.emptyList(); + + StreamMetadataProvider metadataProvider = mock(StreamMetadataProvider.class); + when(metadataProvider.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); + when(metadataProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt(), anyBoolean())) + .thenThrow(new TransientConsumerException(new RuntimeException("Transient error"))); + + StreamConsumerFactory factory = mock(StreamConsumerFactory.class); + when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider); + + try (MockedStatic mockedProvider = Mockito.mockStatic( + StreamConsumerFactoryProvider.class)) { + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); + + PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( + streamConfigs, statusList, Collections.emptyList(), false, false); + + // Execute + Boolean result = fetcher.call(); + + // Verify: TransientConsumerException should return FALSE immediately + Assert.assertFalse(result); + Assert.assertTrue(fetcher.getException() instanceof TransientConsumerException); + } + } + + @Test + public void testFetchMultipleStreamsSkipsNonExistentTopic() + throws Exception { + // Setup: 3 streams where the second topic does not exist + StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", true); + StreamConfig streamConfig2 = createMockStreamConfig("topic2-nonexistent", "test-table", true); + StreamConfig streamConfig3 = createMockStreamConfig("topic3", "test-table", true); + List streamConfigs = Arrays.asList(streamConfig1, streamConfig2, streamConfig3); + + List statusList = Collections.emptyList(); + + PartitionGroupMetadata mockedMetadata = new PartitionGroupMetadata(0, mock(StreamPartitionMsgOffset.class)); + + // Create topic metadata for existing topics only (topic1 and topic3) + StreamMetadataProvider.TopicMetadata topic1Metadata = mock(StreamMetadataProvider.TopicMetadata.class); + when(topic1Metadata.getName()).thenReturn("topic1"); + StreamMetadataProvider.TopicMetadata topic3Metadata = mock(StreamMetadataProvider.TopicMetadata.class); + when(topic3Metadata.getName()).thenReturn("topic3"); + + // Provider for topic1 - topic exists + StreamMetadataProvider provider1 = mock(StreamMetadataProvider.class); + when(provider1.getTopics()).thenReturn(Arrays.asList(topic1Metadata, topic3Metadata)); + when(provider1.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt(), anyBoolean())) + .thenReturn(Collections.singletonList(mockedMetadata)); + + // Provider for topic2 - topic does NOT exist (getTopics returns list without topic2) + StreamMetadataProvider provider2 = mock(StreamMetadataProvider.class); + when(provider2.getTopics()).thenReturn(Arrays.asList(topic1Metadata, topic3Metadata)); + + // Provider for topic3 - topic exists + StreamMetadataProvider provider3 = mock(StreamMetadataProvider.class); + when(provider3.getTopics()).thenReturn(Arrays.asList(topic1Metadata, topic3Metadata)); + when(provider3.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt(), anyBoolean())) + .thenReturn(Collections.singletonList(mockedMetadata)); + + StreamConsumerFactory factory1 = mock(StreamConsumerFactory.class); + when(factory1.createStreamMetadataProvider(anyString())).thenReturn(provider1); + + StreamConsumerFactory factory2 = mock(StreamConsumerFactory.class); + when(factory2.createStreamMetadataProvider(anyString())).thenReturn(provider2); + + StreamConsumerFactory factory3 = mock(StreamConsumerFactory.class); + when(factory3.createStreamMetadataProvider(anyString())).thenReturn(provider3); + + try (MockedStatic mockedProvider = Mockito.mockStatic( + StreamConsumerFactoryProvider.class)) { + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig1)).thenReturn(factory1); + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig2)).thenReturn(factory2); + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig3)).thenReturn(factory3); + + PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( + streamConfigs, statusList, Collections.emptyList(), false, false); + + // Execute + Boolean result = fetcher.call(); + + // Verify: should return TRUE, non-existent topic is skipped + Assert.assertTrue(result); + Assert.assertNull(fetcher.getException()); + + // Verify: metadata only from topic1 (index 0) and topic3 (index 2) + List resultMetadata = fetcher.getPartitionGroupMetadataList(); + Assert.assertEquals(resultMetadata.size(), 2); + + List partitionIds = resultMetadata.stream() + .map(PartitionGroupMetadata::getPartitionGroupId) + .sorted() + .collect(Collectors.toList()); + + // Partition IDs: 0 from topic1 (index 0), 20000 from topic3 (index 2) + Assert.assertEquals(partitionIds, Arrays.asList(0, 20000)); + } + } + + @Test + public void testFetchMultipleStreamsProceedsWhenGetTopicsUnsupported() + throws Exception { + // Setup: getTopics() throws UnsupportedOperationException, should proceed without validation + StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", false); + StreamConfig streamConfig2 = createMockStreamConfig("topic2", "test-table", false); + List streamConfigs = Arrays.asList(streamConfig1, streamConfig2); + + List statusList = Collections.emptyList(); + + PartitionGroupMetadata mockedMetadata = new PartitionGroupMetadata(0, mock(StreamPartitionMsgOffset.class)); + + StreamMetadataProvider metadataProvider = mock(StreamMetadataProvider.class); + // getTopics() throws UnsupportedOperationException (default behavior for non-Kafka streams) + when(metadataProvider.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); + when(metadataProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt(), anyBoolean())) + .thenReturn(Collections.singletonList(mockedMetadata)); + + StreamConsumerFactory factory = mock(StreamConsumerFactory.class); + when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider); + + try (MockedStatic mockedProvider = Mockito.mockStatic( + StreamConsumerFactoryProvider.class)) { + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); + + PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( + streamConfigs, statusList, Collections.emptyList(), false, false); + + // Execute + Boolean result = fetcher.call(); + + // Verify: should return TRUE, processing continues despite UnsupportedOperationException + Assert.assertTrue(result); + Assert.assertNull(fetcher.getException()); + + // Verify: metadata from both topics should be collected + List resultMetadata = fetcher.getPartitionGroupMetadataList(); + Assert.assertEquals(resultMetadata.size(), 2); + + List partitionIds = resultMetadata.stream() + .map(PartitionGroupMetadata::getPartitionGroupId) + .sorted() + .collect(Collectors.toList()); + + Assert.assertEquals(partitionIds, Arrays.asList(0, 10000)); + } + } + + @Test + public void testFetchMultipleStreamsTopicExistsCheckPasses() + throws Exception { + // Setup: All topics exist, processing should proceed normally + StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", true); + StreamConfig streamConfig2 = createMockStreamConfig("topic2", "test-table", true); + List streamConfigs = Arrays.asList(streamConfig1, streamConfig2); + + List statusList = Collections.emptyList(); + + PartitionGroupMetadata mockedMetadata = new PartitionGroupMetadata(0, mock(StreamPartitionMsgOffset.class)); + + // Create topic metadata for both topics + StreamMetadataProvider.TopicMetadata topic1Metadata = mock(StreamMetadataProvider.TopicMetadata.class); + when(topic1Metadata.getName()).thenReturn("topic1"); + StreamMetadataProvider.TopicMetadata topic2Metadata = mock(StreamMetadataProvider.TopicMetadata.class); + when(topic2Metadata.getName()).thenReturn("topic2"); + + StreamMetadataProvider metadataProvider = mock(StreamMetadataProvider.class); + when(metadataProvider.getTopics()).thenReturn(Arrays.asList(topic1Metadata, topic2Metadata)); + when(metadataProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt(), anyBoolean())) + .thenReturn(Collections.singletonList(mockedMetadata)); + + StreamConsumerFactory factory = mock(StreamConsumerFactory.class); + when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider); + + try (MockedStatic mockedProvider = Mockito.mockStatic( + StreamConsumerFactoryProvider.class)) { + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); + + PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( + streamConfigs, statusList, Collections.emptyList(), false, false); + + // Execute + Boolean result = fetcher.call(); + + // Verify: should return TRUE + Assert.assertTrue(result); + Assert.assertNull(fetcher.getException()); + + // Verify: metadata from both topics should be collected + List resultMetadata = fetcher.getPartitionGroupMetadataList(); + Assert.assertEquals(resultMetadata.size(), 2); + + List partitionIds = resultMetadata.stream() + .map(PartitionGroupMetadata::getPartitionGroupId) + .sorted() + .collect(Collectors.toList()); + + Assert.assertEquals(partitionIds, Arrays.asList(0, 10000)); + } + } + + private StreamConfig createMockStreamConfig(String topicName, String tableName, + boolean topicExistenceCheckEnabled) { StreamConfig streamConfig = mock(StreamConfig.class); when(streamConfig.getTopicName()).thenReturn(topicName); when(streamConfig.getTableNameWithType()).thenReturn(tableName); + Map configsMap = new HashMap<>(); + if (topicExistenceCheckEnabled) { + configsMap.put(StreamConfigProperties.MULTITOPIC_SKIP_MISSING_TOPICS, "true"); + } + when(streamConfig.getStreamConfigsMap()).thenReturn(configsMap); return streamConfig; } }