From dea1a7da04935a0c6b69bf5d636d30afb09c8e52 Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Fri, 6 Feb 2026 23:19:43 +0000 Subject: [PATCH 01/16] Fix: Handle kafka topic not found exception in fetchPartitionCount function --- .../spi/stream/StreamMetadataProvider.java | 26 ++- .../stream/StreamMetadataProviderTest.java | 185 ++++++++++++++++++ 2 files changed, 210 insertions(+), 1 deletion(-) create mode 100644 pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataProviderTest.java diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 4b5666a7a35a..9d9e7b3ce362 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -30,6 +30,8 @@ import javax.annotation.Nullable; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -39,6 +41,7 @@ @InterfaceStability.Stable public interface StreamMetadataProvider extends Closeable { + Logger LOGGER = LoggerFactory.getLogger(StreamMetadataProvider.class); /** * Fetches the number of partitions for a topic given the stream configs * @param timeoutMillis Fetch timeout @@ -89,7 +92,28 @@ StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteri default List computePartitionGroupMetadata(String clientId, StreamConfig streamConfig, List partitionGroupConsumptionStatuses, int timeoutMillis) throws IOException, TimeoutException { - int partitionCount = fetchPartitionCount(timeoutMillis); + + int partitionCount; + try { + partitionCount = fetchPartitionCount(timeoutMillis); + } catch (RuntimeException e) { + LOGGER.warn("Failed to fetch partition count for stream config: {}. " + + "Skipping stream and using existing partitions only. Error: {}", + streamConfig.getTopicName(), e.getMessage(), e); + // Return only the existing partition groups if we can't fetch partition count + // Add a PartitionGroupMetadata into the list, foreach partition already present in current. + // Setting endOffset (exclusive) as the startOffset for new partition group. + // If partition group is still in progress, this value will be null + List existingPartitionGroupMetadataList = + new ArrayList<>(partitionGroupConsumptionStatuses.size()); + for (PartitionGroupConsumptionStatus currentPartitionGroupConsumptionStatus : partitionGroupConsumptionStatuses) { + existingPartitionGroupMetadataList.add( + new PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getStreamPartitionGroupId(), + currentPartitionGroupConsumptionStatus.getEndOffset())); + } + return existingPartitionGroupMetadataList; + } + List newPartitionGroupMetadataList = new ArrayList<>(partitionCount); // Add a PartitionGroupMetadata into the list, foreach partition already present in current. diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataProviderTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataProviderTest.java new file mode 100644 index 000000000000..7b442926adb4 --- /dev/null +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataProviderTest.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +/** + * Unit tests for StreamMetadataProvider interface, specifically testing exception handling + * in the computePartitionGroupMetadata method. + */ +public class StreamMetadataProviderTest { + + private static final String CLIENT_ID = "test-client"; + private static final int TIMEOUT_MILLIS = 5000; + private static final String TOPIC_NAME = "test-topic"; + + @Mock + private StreamConfig _streamConfig; + + @BeforeMethod + public void setUp() { + MockitoAnnotations.openMocks(this); + + // Set up stream config mock + when(_streamConfig.getTopicName()).thenReturn(TOPIC_NAME); + } + + @Test + public void testComputePartitionGroupMetadataWithFetchPartitionCountException() throws Exception { + // Create existing partition group consumption statuses + List existingStatuses = createExistingPartitionStatuses(); + + // Create a StreamMetadataProvider that throws exception on fetchPartitionCount + StreamMetadataProvider provider = new TestStreamMetadataProviderWithException(); + + // Call computePartitionGroupMetadata + List result = provider.computePartitionGroupMetadata( + CLIENT_ID, _streamConfig, existingStatuses, TIMEOUT_MILLIS); + + // Verify that the method returns existing partitions only + Assert.assertEquals(result.size(), existingStatuses.size()); + + // Verify that the returned metadata matches the existing statuses + for (int i = 0; i < result.size(); i++) { + PartitionGroupMetadata metadata = result.get(i); + PartitionGroupConsumptionStatus status = existingStatuses.get(i); + + Assert.assertEquals(metadata.getPartitionGroupId(), status.getStreamPartitionGroupId()); + Assert.assertEquals(metadata.getStartOffset(), status.getEndOffset()); + } + } + + @Test + public void testComputePartitionGroupMetadataWithEmptyExistingStatuses() throws Exception { + // Test with empty existing partition statuses + List emptyStatuses = new ArrayList<>(); + + StreamMetadataProvider provider = new TestStreamMetadataProviderWithException(); + + List result = provider.computePartitionGroupMetadata( + CLIENT_ID, _streamConfig, emptyStatuses, TIMEOUT_MILLIS); + + // Should return empty list when no existing partitions and fetchPartitionCount fails + Assert.assertEquals(result.size(), 0); + } + + @Test + public void testComputePartitionGroupMetadataWithDifferentExceptionTypes() throws Exception { + List existingStatuses = createExistingPartitionStatuses(); + + // Test with RuntimeException + StreamMetadataProvider providerWithRuntimeException = new StreamMetadataProvider() { + @Override + public int fetchPartitionCount(long timeoutMillis) { + throw new RuntimeException("Connection failed"); + } + + @Override + public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) + throws TimeoutException { + return mock(StreamPartitionMsgOffset.class); + } + + @Override + public void close() throws IOException { + } + }; + + List result = providerWithRuntimeException.computePartitionGroupMetadata( + CLIENT_ID, _streamConfig, existingStatuses, TIMEOUT_MILLIS); + + Assert.assertEquals(result.size(), existingStatuses.size()); + + // Test with TimeoutException + StreamMetadataProvider providerWithTimeoutException = new StreamMetadataProvider() { + @Override + public int fetchPartitionCount(long timeoutMillis) { + throw new RuntimeException(new TimeoutException("Timeout occurred")); + } + + @Override + public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) + throws TimeoutException { + return mock(StreamPartitionMsgOffset.class); + } + + @Override + public void close() throws IOException { + } + }; + + result = providerWithTimeoutException.computePartitionGroupMetadata( + CLIENT_ID, _streamConfig, existingStatuses, TIMEOUT_MILLIS); + + Assert.assertEquals(result.size(), existingStatuses.size()); + } + + /** + * Creates a list of existing partition group consumption statuses for testing + */ + private List createExistingPartitionStatuses() { + List statuses = new ArrayList<>(); + + // Create mock offset + StreamPartitionMsgOffset mockOffset1 = mock(StreamPartitionMsgOffset.class); + StreamPartitionMsgOffset mockOffset2 = mock(StreamPartitionMsgOffset.class); + + statuses.add(new PartitionGroupConsumptionStatus(0, 1, mockOffset1, mockOffset1, "CONSUMING")); + statuses.add(new PartitionGroupConsumptionStatus(1, 2, mockOffset2, mockOffset2, "CONSUMING")); + + return statuses; + } + + /** + * Test implementation of StreamMetadataProvider that throws an exception + * when fetchPartitionCount is called + */ + private static class TestStreamMetadataProviderWithException implements StreamMetadataProvider { + + @Override + public int fetchPartitionCount(long timeoutMillis) { + throw new RuntimeException("Simulated partition count fetch failure"); + } + + @Override + public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) + throws TimeoutException { + // This should not be called in the exception scenario + return mock(StreamPartitionMsgOffset.class); + } + + @Override + public void close() throws IOException { + // No-op for test + } + } +} From 3ecbce6b7680921f87297d17459bdb21527b9752 Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Tue, 10 Feb 2026 23:29:05 +0000 Subject: [PATCH 02/16] diff review comments addressed --- .../stream/PartitionGroupMetadataFetcher.java | 4 +- .../spi/stream/StreamMetadataProvider.java | 26 +---- .../PartitionGroupMetadataFetcherTest.java | 108 ++++++++++++++++++ 3 files changed, 111 insertions(+), 27 deletions(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 698ad472e1a1..16be0f0c107c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -140,9 +140,9 @@ private Boolean fetchMultipleStreams() _exception = e; return Boolean.FALSE; } catch (Exception e) { - LOGGER.warn("Could not get partition count for topic {}", topicName, e); + LOGGER.warn("Could not get partition count for topic {}. This topic will be skipped from ingestion.", + _topicNames.get(i), e); _exception = e; - throw e; } } return Boolean.TRUE; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 9d9e7b3ce362..4b5666a7a35a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -30,8 +30,6 @@ import javax.annotation.Nullable; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** @@ -41,7 +39,6 @@ @InterfaceStability.Stable public interface StreamMetadataProvider extends Closeable { - Logger LOGGER = LoggerFactory.getLogger(StreamMetadataProvider.class); /** * Fetches the number of partitions for a topic given the stream configs * @param timeoutMillis Fetch timeout @@ -92,28 +89,7 @@ StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteri default List computePartitionGroupMetadata(String clientId, StreamConfig streamConfig, List partitionGroupConsumptionStatuses, int timeoutMillis) throws IOException, TimeoutException { - - int partitionCount; - try { - partitionCount = fetchPartitionCount(timeoutMillis); - } catch (RuntimeException e) { - LOGGER.warn("Failed to fetch partition count for stream config: {}. " - + "Skipping stream and using existing partitions only. Error: {}", - streamConfig.getTopicName(), e.getMessage(), e); - // Return only the existing partition groups if we can't fetch partition count - // Add a PartitionGroupMetadata into the list, foreach partition already present in current. - // Setting endOffset (exclusive) as the startOffset for new partition group. - // If partition group is still in progress, this value will be null - List existingPartitionGroupMetadataList = - new ArrayList<>(partitionGroupConsumptionStatuses.size()); - for (PartitionGroupConsumptionStatus currentPartitionGroupConsumptionStatus : partitionGroupConsumptionStatuses) { - existingPartitionGroupMetadataList.add( - new PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getStreamPartitionGroupId(), - currentPartitionGroupConsumptionStatus.getEndOffset())); - } - return existingPartitionGroupMetadataList; - } - + int partitionCount = fetchPartitionCount(timeoutMillis); List newPartitionGroupMetadataList = new ArrayList<>(partitionCount); // Add a PartitionGroupMetadata into the list, foreach partition already present in current. diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java index 9fa65254b636..eba306885de9 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java @@ -207,6 +207,114 @@ public void testFetchMultipleStreamsWithPause() } } + @Test + public void testFetchMultipleStreamsWithExceptionContinuesProcessing() + throws Exception { + // Setup: 3 streams where the second one throws an exception + StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", false); + StreamConfig streamConfig2 = createMockStreamConfig("topic2-failing", "test-table", false); + StreamConfig streamConfig3 = createMockStreamConfig("topic3", "test-table", false); + List streamConfigs = Arrays.asList(streamConfig1, streamConfig2, streamConfig3); + + PartitionGroupConsumptionStatus status1 = new PartitionGroupConsumptionStatus(0, 0, null, null, "IN_PROGRESS"); + List statusList = Collections.singletonList(status1); + + PartitionGroupMetadata mockedMetadata = new PartitionGroupMetadata(0, mock(StreamPartitionMsgOffset.class)); + + // Create separate metadata providers for each stream + StreamMetadataProvider successProvider1 = mock(StreamMetadataProvider.class); + when(successProvider1.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt())) + .thenReturn(Collections.singletonList(mockedMetadata)); + + StreamMetadataProvider failingProvider = mock(StreamMetadataProvider.class); + when(failingProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt())) + .thenThrow(new RuntimeException("Failed to fetch partition count for topic2-failing")); + + StreamMetadataProvider successProvider3 = mock(StreamMetadataProvider.class); + when(successProvider3.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt())) + .thenReturn(Collections.singletonList(mockedMetadata)); + + StreamConsumerFactory factory1 = mock(StreamConsumerFactory.class); + when(factory1.createStreamMetadataProvider(anyString())).thenReturn(successProvider1); + + StreamConsumerFactory factory2 = mock(StreamConsumerFactory.class); + when(factory2.createStreamMetadataProvider(anyString())).thenReturn(failingProvider); + + StreamConsumerFactory factory3 = mock(StreamConsumerFactory.class); + when(factory3.createStreamMetadataProvider(anyString())).thenReturn(successProvider3); + + try (MockedStatic mockedProvider = Mockito.mockStatic( + StreamConsumerFactoryProvider.class)) { + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig1)).thenReturn(factory1); + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig2)).thenReturn(factory2); + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig3)).thenReturn(factory3); + + PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( + streamConfigs, statusList, Collections.emptyList()); + + // Execute + Boolean result = fetcher.call(); + + // Verify: should return TRUE despite exception (continues processing other streams) + Assert.assertTrue(result); + + // Verify: exception should be recorded + Assert.assertNotNull(fetcher.getException()); + Assert.assertTrue(fetcher.getException() instanceof RuntimeException); + Assert.assertTrue(fetcher.getException().getMessage().contains("topic2-failing")); + + // Verify: metadata from successful streams (topic1 and topic3) should be collected + // topic1 returns partition 0, topic3 (index 2) returns partition 20000 + List resultMetadata = fetcher.getPartitionGroupMetadataList(); + Assert.assertEquals(resultMetadata.size(), 2); + + List partitionIds = resultMetadata.stream() + .map(PartitionGroupMetadata::getPartitionGroupId) + .sorted() + .collect(Collectors.toList()); + + // Partition IDs: 0 from topic1 (index 0), 20000 from topic3 (index 2) + Assert.assertEquals(partitionIds, Arrays.asList(0, 20000)); + } + } + + @Test + public void testFetchMultipleStreamsTransientExceptionStopsProcessing() + throws Exception { + // Setup: TransientConsumerException should cause immediate return with FALSE + StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", false); + StreamConfig streamConfig2 = createMockStreamConfig("topic2", "test-table", false); + List streamConfigs = Arrays.asList(streamConfig1, streamConfig2); + + List statusList = Collections.emptyList(); + + StreamMetadataProvider metadataProvider = mock(StreamMetadataProvider.class); + when(metadataProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt())) + .thenThrow(new TransientConsumerException(new RuntimeException("Transient error"))); + + StreamConsumerFactory factory = mock(StreamConsumerFactory.class); + when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider); + + try (MockedStatic mockedProvider = Mockito.mockStatic( + StreamConsumerFactoryProvider.class)) { + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); + + PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( + streamConfigs, statusList, Collections.emptyList()); + + // Execute + Boolean result = fetcher.call(); + + // Verify: TransientConsumerException should return FALSE immediately + Assert.assertFalse(result); + Assert.assertTrue(fetcher.getException() instanceof TransientConsumerException); + } + } + private StreamConfig createMockStreamConfig(String topicName, String tableName, boolean isEphemeral) { StreamConfig streamConfig = mock(StreamConfig.class); when(streamConfig.getTopicName()).thenReturn(topicName); From a599cfedae1b8fb47aa2c42a18487635a68b1cc4 Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Tue, 10 Feb 2026 23:31:58 +0000 Subject: [PATCH 03/16] diff review comments addressed --- .../stream/StreamMetadataProviderTest.java | 185 ------------------ 1 file changed, 185 deletions(-) delete mode 100644 pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataProviderTest.java diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataProviderTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataProviderTest.java deleted file mode 100644 index 7b442926adb4..000000000000 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataProviderTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.stream; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeoutException; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.testng.Assert; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -/** - * Unit tests for StreamMetadataProvider interface, specifically testing exception handling - * in the computePartitionGroupMetadata method. - */ -public class StreamMetadataProviderTest { - - private static final String CLIENT_ID = "test-client"; - private static final int TIMEOUT_MILLIS = 5000; - private static final String TOPIC_NAME = "test-topic"; - - @Mock - private StreamConfig _streamConfig; - - @BeforeMethod - public void setUp() { - MockitoAnnotations.openMocks(this); - - // Set up stream config mock - when(_streamConfig.getTopicName()).thenReturn(TOPIC_NAME); - } - - @Test - public void testComputePartitionGroupMetadataWithFetchPartitionCountException() throws Exception { - // Create existing partition group consumption statuses - List existingStatuses = createExistingPartitionStatuses(); - - // Create a StreamMetadataProvider that throws exception on fetchPartitionCount - StreamMetadataProvider provider = new TestStreamMetadataProviderWithException(); - - // Call computePartitionGroupMetadata - List result = provider.computePartitionGroupMetadata( - CLIENT_ID, _streamConfig, existingStatuses, TIMEOUT_MILLIS); - - // Verify that the method returns existing partitions only - Assert.assertEquals(result.size(), existingStatuses.size()); - - // Verify that the returned metadata matches the existing statuses - for (int i = 0; i < result.size(); i++) { - PartitionGroupMetadata metadata = result.get(i); - PartitionGroupConsumptionStatus status = existingStatuses.get(i); - - Assert.assertEquals(metadata.getPartitionGroupId(), status.getStreamPartitionGroupId()); - Assert.assertEquals(metadata.getStartOffset(), status.getEndOffset()); - } - } - - @Test - public void testComputePartitionGroupMetadataWithEmptyExistingStatuses() throws Exception { - // Test with empty existing partition statuses - List emptyStatuses = new ArrayList<>(); - - StreamMetadataProvider provider = new TestStreamMetadataProviderWithException(); - - List result = provider.computePartitionGroupMetadata( - CLIENT_ID, _streamConfig, emptyStatuses, TIMEOUT_MILLIS); - - // Should return empty list when no existing partitions and fetchPartitionCount fails - Assert.assertEquals(result.size(), 0); - } - - @Test - public void testComputePartitionGroupMetadataWithDifferentExceptionTypes() throws Exception { - List existingStatuses = createExistingPartitionStatuses(); - - // Test with RuntimeException - StreamMetadataProvider providerWithRuntimeException = new StreamMetadataProvider() { - @Override - public int fetchPartitionCount(long timeoutMillis) { - throw new RuntimeException("Connection failed"); - } - - @Override - public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) - throws TimeoutException { - return mock(StreamPartitionMsgOffset.class); - } - - @Override - public void close() throws IOException { - } - }; - - List result = providerWithRuntimeException.computePartitionGroupMetadata( - CLIENT_ID, _streamConfig, existingStatuses, TIMEOUT_MILLIS); - - Assert.assertEquals(result.size(), existingStatuses.size()); - - // Test with TimeoutException - StreamMetadataProvider providerWithTimeoutException = new StreamMetadataProvider() { - @Override - public int fetchPartitionCount(long timeoutMillis) { - throw new RuntimeException(new TimeoutException("Timeout occurred")); - } - - @Override - public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) - throws TimeoutException { - return mock(StreamPartitionMsgOffset.class); - } - - @Override - public void close() throws IOException { - } - }; - - result = providerWithTimeoutException.computePartitionGroupMetadata( - CLIENT_ID, _streamConfig, existingStatuses, TIMEOUT_MILLIS); - - Assert.assertEquals(result.size(), existingStatuses.size()); - } - - /** - * Creates a list of existing partition group consumption statuses for testing - */ - private List createExistingPartitionStatuses() { - List statuses = new ArrayList<>(); - - // Create mock offset - StreamPartitionMsgOffset mockOffset1 = mock(StreamPartitionMsgOffset.class); - StreamPartitionMsgOffset mockOffset2 = mock(StreamPartitionMsgOffset.class); - - statuses.add(new PartitionGroupConsumptionStatus(0, 1, mockOffset1, mockOffset1, "CONSUMING")); - statuses.add(new PartitionGroupConsumptionStatus(1, 2, mockOffset2, mockOffset2, "CONSUMING")); - - return statuses; - } - - /** - * Test implementation of StreamMetadataProvider that throws an exception - * when fetchPartitionCount is called - */ - private static class TestStreamMetadataProviderWithException implements StreamMetadataProvider { - - @Override - public int fetchPartitionCount(long timeoutMillis) { - throw new RuntimeException("Simulated partition count fetch failure"); - } - - @Override - public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) - throws TimeoutException { - // This should not be called in the exception scenario - return mock(StreamPartitionMsgOffset.class); - } - - @Override - public void close() throws IOException { - // No-op for test - } - } -} From ebbaa4fb8ec7910bccdae91f2e9040f0cdbac970 Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Wed, 11 Feb 2026 20:06:16 +0000 Subject: [PATCH 04/16] Address diff review comments --- .../stream/PartitionGroupMetadataFetcher.java | 18 +- .../PartitionGroupMetadataFetcherTest.java | 183 ++++++++++++++++++ 2 files changed, 199 insertions(+), 2 deletions(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 16be0f0c107c..c3b340962d22 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -122,6 +122,20 @@ private Boolean fetchMultipleStreams() .collect(Collectors.toList()); try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider( StreamConsumerFactory.getUniqueClientId(clientId))) { + + // Check if the topic exists before fetching partition metadata + try { + List topics = streamMetadataProvider.getTopics(); + boolean topicExists = topics.stream().anyMatch(t -> t.getName().equals(topicName)); + if (!topicExists) { + LOGGER.warn("Topic {} does not exist. Skipping this topic from ingestion.", topicName); + continue; + } + } catch (Exception e) { + // getTopics() not supported by this stream type, proceed without topic existence validation + LOGGER.debug("Topic existence check not supported for stream type, proceeding for topic: {}", topicName); + } + _newPartitionGroupMetadataList.addAll( streamMetadataProvider.computePartitionGroupMetadata(clientId, streamConfig, topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000, @@ -140,9 +154,9 @@ private Boolean fetchMultipleStreams() _exception = e; return Boolean.FALSE; } catch (Exception e) { - LOGGER.warn("Could not get partition count for topic {}. This topic will be skipped from ingestion.", - _topicNames.get(i), e); + LOGGER.warn("Could not get partition count for topic {}", topicName, e); _exception = e; + throw e; } } return Boolean.TRUE; diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java index eba306885de9..765533c49446 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java @@ -124,6 +124,8 @@ public void testFetchMultipleStreams() PartitionGroupMetadata mockedMetadata2 = new PartitionGroupMetadata(1, mock(StreamPartitionMsgOffset.class)); StreamMetadataProvider metadataProvider = mock(StreamMetadataProvider.class); + // Mock getTopics() to throw UnsupportedOperationException (bypasses topic existence check) + when(metadataProvider.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); when(metadataProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), any(List.class), anyInt(), anyBoolean())) .thenReturn(Arrays.asList(mockedMetadata1, mockedMetadata2)); @@ -174,6 +176,8 @@ public void testFetchMultipleStreamsWithPause() PartitionGroupMetadata mockedMetadata2 = new PartitionGroupMetadata(1, mock(StreamPartitionMsgOffset.class)); StreamMetadataProvider metadataProvider = mock(StreamMetadataProvider.class); + // Mock getTopics() to throw UnsupportedOperationException (bypasses topic existence check) + when(metadataProvider.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); when(metadataProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), any(List.class), anyInt(), anyBoolean())) .thenReturn(Arrays.asList(mockedMetadata1, mockedMetadata2)); @@ -315,6 +319,185 @@ public void testFetchMultipleStreamsTransientExceptionStopsProcessing() } } + @Test + public void testFetchMultipleStreamsSkipsNonExistentTopic() + throws Exception { + // Setup: 3 streams where the second topic does not exist + StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", false); + StreamConfig streamConfig2 = createMockStreamConfig("topic2-nonexistent", "test-table", false); + StreamConfig streamConfig3 = createMockStreamConfig("topic3", "test-table", false); + List streamConfigs = Arrays.asList(streamConfig1, streamConfig2, streamConfig3); + + List statusList = Collections.emptyList(); + + PartitionGroupMetadata mockedMetadata = new PartitionGroupMetadata(0, mock(StreamPartitionMsgOffset.class)); + + // Create topic metadata for existing topics only (topic1 and topic3) + StreamMetadataProvider.TopicMetadata topic1Metadata = mock(StreamMetadataProvider.TopicMetadata.class); + when(topic1Metadata.getName()).thenReturn("topic1"); + StreamMetadataProvider.TopicMetadata topic3Metadata = mock(StreamMetadataProvider.TopicMetadata.class); + when(topic3Metadata.getName()).thenReturn("topic3"); + + // Provider for topic1 - topic exists + StreamMetadataProvider provider1 = mock(StreamMetadataProvider.class); + when(provider1.getTopics()).thenReturn(Arrays.asList(topic1Metadata, topic3Metadata)); + when(provider1.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt())) + .thenReturn(Collections.singletonList(mockedMetadata)); + + // Provider for topic2 - topic does NOT exist (getTopics returns list without topic2) + StreamMetadataProvider provider2 = mock(StreamMetadataProvider.class); + when(provider2.getTopics()).thenReturn(Arrays.asList(topic1Metadata, topic3Metadata)); + + // Provider for topic3 - topic exists + StreamMetadataProvider provider3 = mock(StreamMetadataProvider.class); + when(provider3.getTopics()).thenReturn(Arrays.asList(topic1Metadata, topic3Metadata)); + when(provider3.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt())) + .thenReturn(Collections.singletonList(mockedMetadata)); + + StreamConsumerFactory factory1 = mock(StreamConsumerFactory.class); + when(factory1.createStreamMetadataProvider(anyString())).thenReturn(provider1); + + StreamConsumerFactory factory2 = mock(StreamConsumerFactory.class); + when(factory2.createStreamMetadataProvider(anyString())).thenReturn(provider2); + + StreamConsumerFactory factory3 = mock(StreamConsumerFactory.class); + when(factory3.createStreamMetadataProvider(anyString())).thenReturn(provider3); + + try (MockedStatic mockedProvider = Mockito.mockStatic( + StreamConsumerFactoryProvider.class)) { + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig1)).thenReturn(factory1); + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig2)).thenReturn(factory2); + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig3)).thenReturn(factory3); + + PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( + streamConfigs, statusList, Collections.emptyList()); + + // Execute + Boolean result = fetcher.call(); + + // Verify: should return TRUE, non-existent topic is skipped + Assert.assertTrue(result); + Assert.assertNull(fetcher.getException()); + + // Verify: metadata only from topic1 (index 0) and topic3 (index 2) + List resultMetadata = fetcher.getPartitionGroupMetadataList(); + Assert.assertEquals(resultMetadata.size(), 2); + + List partitionIds = resultMetadata.stream() + .map(PartitionGroupMetadata::getPartitionGroupId) + .sorted() + .collect(Collectors.toList()); + + // Partition IDs: 0 from topic1 (index 0), 20000 from topic3 (index 2) + Assert.assertEquals(partitionIds, Arrays.asList(0, 20000)); + } + } + + @Test + public void testFetchMultipleStreamsProceedsWhenGetTopicsUnsupported() + throws Exception { + // Setup: getTopics() throws UnsupportedOperationException, should proceed without validation + StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", false); + StreamConfig streamConfig2 = createMockStreamConfig("topic2", "test-table", false); + List streamConfigs = Arrays.asList(streamConfig1, streamConfig2); + + List statusList = Collections.emptyList(); + + PartitionGroupMetadata mockedMetadata = new PartitionGroupMetadata(0, mock(StreamPartitionMsgOffset.class)); + + StreamMetadataProvider metadataProvider = mock(StreamMetadataProvider.class); + // getTopics() throws UnsupportedOperationException (default behavior for non-Kafka streams) + when(metadataProvider.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); + when(metadataProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt())) + .thenReturn(Collections.singletonList(mockedMetadata)); + + StreamConsumerFactory factory = mock(StreamConsumerFactory.class); + when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider); + + try (MockedStatic mockedProvider = Mockito.mockStatic( + StreamConsumerFactoryProvider.class)) { + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); + + PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( + streamConfigs, statusList, Collections.emptyList()); + + // Execute + Boolean result = fetcher.call(); + + // Verify: should return TRUE, processing continues despite UnsupportedOperationException + Assert.assertTrue(result); + Assert.assertNull(fetcher.getException()); + + // Verify: metadata from both topics should be collected + List resultMetadata = fetcher.getPartitionGroupMetadataList(); + Assert.assertEquals(resultMetadata.size(), 2); + + List partitionIds = resultMetadata.stream() + .map(PartitionGroupMetadata::getPartitionGroupId) + .sorted() + .collect(Collectors.toList()); + + Assert.assertEquals(partitionIds, Arrays.asList(0, 10000)); + } + } + + @Test + public void testFetchMultipleStreamsTopicExistsCheckPasses() + throws Exception { + // Setup: All topics exist, processing should proceed normally + StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", false); + StreamConfig streamConfig2 = createMockStreamConfig("topic2", "test-table", false); + List streamConfigs = Arrays.asList(streamConfig1, streamConfig2); + + List statusList = Collections.emptyList(); + + PartitionGroupMetadata mockedMetadata = new PartitionGroupMetadata(0, mock(StreamPartitionMsgOffset.class)); + + // Create topic metadata for both topics + StreamMetadataProvider.TopicMetadata topic1Metadata = mock(StreamMetadataProvider.TopicMetadata.class); + when(topic1Metadata.getName()).thenReturn("topic1"); + StreamMetadataProvider.TopicMetadata topic2Metadata = mock(StreamMetadataProvider.TopicMetadata.class); + when(topic2Metadata.getName()).thenReturn("topic2"); + + StreamMetadataProvider metadataProvider = mock(StreamMetadataProvider.class); + when(metadataProvider.getTopics()).thenReturn(Arrays.asList(topic1Metadata, topic2Metadata)); + when(metadataProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), + any(List.class), anyInt())) + .thenReturn(Collections.singletonList(mockedMetadata)); + + StreamConsumerFactory factory = mock(StreamConsumerFactory.class); + when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider); + + try (MockedStatic mockedProvider = Mockito.mockStatic( + StreamConsumerFactoryProvider.class)) { + mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); + + PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( + streamConfigs, statusList, Collections.emptyList()); + + // Execute + Boolean result = fetcher.call(); + + // Verify: should return TRUE + Assert.assertTrue(result); + Assert.assertNull(fetcher.getException()); + + // Verify: metadata from both topics should be collected + List resultMetadata = fetcher.getPartitionGroupMetadataList(); + Assert.assertEquals(resultMetadata.size(), 2); + + List partitionIds = resultMetadata.stream() + .map(PartitionGroupMetadata::getPartitionGroupId) + .sorted() + .collect(Collectors.toList()); + + Assert.assertEquals(partitionIds, Arrays.asList(0, 10000)); + } + } + private StreamConfig createMockStreamConfig(String topicName, String tableName, boolean isEphemeral) { StreamConfig streamConfig = mock(StreamConfig.class); when(streamConfig.getTopicName()).thenReturn(topicName); From 14faf05fc09afdd3c3bd56777f618a434384443f Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Fri, 20 Feb 2026 23:22:04 +0000 Subject: [PATCH 05/16] addressed diff review comments --- .../stream/PartitionGroupMetadataFetcher.java | 57 +++++++++++++++---- .../PartitionGroupMetadataFetcherTest.java | 30 +++++----- 2 files changed, 64 insertions(+), 23 deletions(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index c3b340962d22..05c22ef33033 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.stream.Collectors; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -33,6 +34,7 @@ */ public class PartitionGroupMetadataFetcher implements Callable { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class); + private static final String TOPIC_EXISTENCE_CHECK_ENABLED = TOPIC_EXISTENCE_CHECK_ENABLED; private final List _streamConfigs; private final List _partitionGroupConsumptionStatusList; @@ -102,6 +104,10 @@ private Boolean fetchSingleStream() private Boolean fetchMultipleStreams() throws Exception { int numStreams = _streamConfigs.size(); + + // Fetch available topics once and reuse across all streams (for topic existence validation) + Set availableTopicNames = fetchAvailableTopicNames(); + for (int i = 0; i < numStreams; i++) { if (_pausedTopicIndices.contains(i)) { LOGGER.info("Skipping fetching PartitionGroupMetadata for paused topic: {}", @@ -124,16 +130,12 @@ private Boolean fetchMultipleStreams() StreamConsumerFactory.getUniqueClientId(clientId))) { // Check if the topic exists before fetching partition metadata - try { - List topics = streamMetadataProvider.getTopics(); - boolean topicExists = topics.stream().anyMatch(t -> t.getName().equals(topicName)); - if (!topicExists) { - LOGGER.warn("Topic {} does not exist. Skipping this topic from ingestion.", topicName); - continue; - } - } catch (Exception e) { - // getTopics() not supported by this stream type, proceed without topic existence validation - LOGGER.debug("Topic existence check not supported for stream type, proceeding for topic: {}", topicName); + // Only perform this check if topic existence validation is enabled and topics were fetched + boolean checkTopicExists = Boolean.parseBoolean( + streamConfig.getStreamConfigsMap().getOrDefault(TOPIC_EXISTENCE_CHECK_ENABLED, "false")); + if (checkTopicExists && availableTopicNames != null && !availableTopicNames.contains(topicName)) { + LOGGER.warn("Topic {} does not exist. Skipping this topic from ingestion.", topicName); + continue; } _newPartitionGroupMetadataList.addAll( @@ -161,4 +163,39 @@ private Boolean fetchMultipleStreams() } return Boolean.TRUE; } + + /** + * Fetches available topic names from the stream provider. + * Uses the first stream config that has topic existence check enabled. + * + * @return Set of available topic names, or null if topics could not be fetched + */ + private Set fetchAvailableTopicNames() { + // Find first stream config with topic existence check enabled + StreamConfig streamConfigForTopicFetch = _streamConfigs.stream() + .filter(config -> Boolean.parseBoolean( + config.getStreamConfigsMap().getOrDefault(TOPIC_EXISTENCE_CHECK_ENABLED, "false"))) + .findFirst() + .orElse(null); + + if (streamConfigForTopicFetch == null) { + return null; + } + + String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-topicFetch-" + + streamConfigForTopicFetch.getTableNameWithType(); + StreamConsumerFactory factory = StreamConsumerFactoryProvider.create(streamConfigForTopicFetch); + + try (StreamMetadataProvider provider = factory.createStreamMetadataProvider(clientId)) { + return provider.getTopics().stream() + .map(StreamMetadataProvider.TopicMetadata::getName) + .collect(Collectors.toSet()); + } catch (UnsupportedOperationException e) { + LOGGER.debug("getTopics() not supported for stream type, skipping topic existence validation"); + return null; + } catch (Exception e) { + LOGGER.warn("Failed to fetch available topics, skipping topic existence validation", e); + return null; + } + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java index 765533c49446..946c33b08744 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java @@ -227,18 +227,21 @@ public void testFetchMultipleStreamsWithExceptionContinuesProcessing() // Create separate metadata providers for each stream StreamMetadataProvider successProvider1 = mock(StreamMetadataProvider.class); + when(successProvider1.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); when(successProvider1.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), - any(List.class), anyInt())) + any(List.class), anyInt(), anyBoolean())) .thenReturn(Collections.singletonList(mockedMetadata)); StreamMetadataProvider failingProvider = mock(StreamMetadataProvider.class); + when(failingProvider.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); when(failingProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), - any(List.class), anyInt())) + any(List.class), anyInt(), anyBoolean())) .thenThrow(new RuntimeException("Failed to fetch partition count for topic2-failing")); StreamMetadataProvider successProvider3 = mock(StreamMetadataProvider.class); + when(successProvider3.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); when(successProvider3.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), - any(List.class), anyInt())) + any(List.class), anyInt(), anyBoolean())) .thenReturn(Collections.singletonList(mockedMetadata)); StreamConsumerFactory factory1 = mock(StreamConsumerFactory.class); @@ -257,7 +260,7 @@ public void testFetchMultipleStreamsWithExceptionContinuesProcessing() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig3)).thenReturn(factory3); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList()); + streamConfigs, statusList, Collections.emptyList(), false); // Execute Boolean result = fetcher.call(); @@ -296,8 +299,9 @@ public void testFetchMultipleStreamsTransientExceptionStopsProcessing() List statusList = Collections.emptyList(); StreamMetadataProvider metadataProvider = mock(StreamMetadataProvider.class); + when(metadataProvider.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); when(metadataProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), - any(List.class), anyInt())) + any(List.class), anyInt(), anyBoolean())) .thenThrow(new TransientConsumerException(new RuntimeException("Transient error"))); StreamConsumerFactory factory = mock(StreamConsumerFactory.class); @@ -308,7 +312,7 @@ public void testFetchMultipleStreamsTransientExceptionStopsProcessing() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList()); + streamConfigs, statusList, Collections.emptyList(), false); // Execute Boolean result = fetcher.call(); @@ -342,7 +346,7 @@ public void testFetchMultipleStreamsSkipsNonExistentTopic() StreamMetadataProvider provider1 = mock(StreamMetadataProvider.class); when(provider1.getTopics()).thenReturn(Arrays.asList(topic1Metadata, topic3Metadata)); when(provider1.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), - any(List.class), anyInt())) + any(List.class), anyInt(), anyBoolean())) .thenReturn(Collections.singletonList(mockedMetadata)); // Provider for topic2 - topic does NOT exist (getTopics returns list without topic2) @@ -353,7 +357,7 @@ public void testFetchMultipleStreamsSkipsNonExistentTopic() StreamMetadataProvider provider3 = mock(StreamMetadataProvider.class); when(provider3.getTopics()).thenReturn(Arrays.asList(topic1Metadata, topic3Metadata)); when(provider3.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), - any(List.class), anyInt())) + any(List.class), anyInt(), anyBoolean())) .thenReturn(Collections.singletonList(mockedMetadata)); StreamConsumerFactory factory1 = mock(StreamConsumerFactory.class); @@ -372,7 +376,7 @@ public void testFetchMultipleStreamsSkipsNonExistentTopic() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig3)).thenReturn(factory3); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList()); + streamConfigs, statusList, Collections.emptyList(), false); // Execute Boolean result = fetcher.call(); @@ -411,7 +415,7 @@ public void testFetchMultipleStreamsProceedsWhenGetTopicsUnsupported() // getTopics() throws UnsupportedOperationException (default behavior for non-Kafka streams) when(metadataProvider.getTopics()).thenThrow(new UnsupportedOperationException("Not supported")); when(metadataProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), - any(List.class), anyInt())) + any(List.class), anyInt(), anyBoolean())) .thenReturn(Collections.singletonList(mockedMetadata)); StreamConsumerFactory factory = mock(StreamConsumerFactory.class); @@ -422,7 +426,7 @@ public void testFetchMultipleStreamsProceedsWhenGetTopicsUnsupported() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList()); + streamConfigs, statusList, Collections.emptyList(), false); // Execute Boolean result = fetcher.call(); @@ -465,7 +469,7 @@ public void testFetchMultipleStreamsTopicExistsCheckPasses() StreamMetadataProvider metadataProvider = mock(StreamMetadataProvider.class); when(metadataProvider.getTopics()).thenReturn(Arrays.asList(topic1Metadata, topic2Metadata)); when(metadataProvider.computePartitionGroupMetadata(anyString(), any(StreamConfig.class), - any(List.class), anyInt())) + any(List.class), anyInt(), anyBoolean())) .thenReturn(Collections.singletonList(mockedMetadata)); StreamConsumerFactory factory = mock(StreamConsumerFactory.class); @@ -476,7 +480,7 @@ public void testFetchMultipleStreamsTopicExistsCheckPasses() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList()); + streamConfigs, statusList, Collections.emptyList(), false); // Execute Boolean result = fetcher.call(); From 64fac9cba8797150586932b204497ccf5926d37b Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Mon, 23 Feb 2026 17:51:30 +0000 Subject: [PATCH 06/16] fix build error and unit tests --- .../stream/PartitionGroupMetadataFetcher.java | 4 ++-- .../PartitionGroupMetadataFetcherTest.java | 20 +++++++++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 05c22ef33033..740d919ba331 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -34,7 +34,7 @@ */ public class PartitionGroupMetadataFetcher implements Callable { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class); - private static final String TOPIC_EXISTENCE_CHECK_ENABLED = TOPIC_EXISTENCE_CHECK_ENABLED; + private static final String TOPIC_EXISTENCE_CHECK_ENABLED = "topic.existence.check.enabled"; private final List _streamConfigs; private final List _partitionGroupConsumptionStatusList; @@ -158,7 +158,7 @@ private Boolean fetchMultipleStreams() } catch (Exception e) { LOGGER.warn("Could not get partition count for topic {}", topicName, e); _exception = e; - throw e; + // Continue processing other streams (don't throw for multiple streams) } } return Boolean.TRUE; diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java index 946c33b08744..48d565768e7f 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java @@ -20,7 +20,9 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -327,9 +329,9 @@ public void testFetchMultipleStreamsTransientExceptionStopsProcessing() public void testFetchMultipleStreamsSkipsNonExistentTopic() throws Exception { // Setup: 3 streams where the second topic does not exist - StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", false); - StreamConfig streamConfig2 = createMockStreamConfig("topic2-nonexistent", "test-table", false); - StreamConfig streamConfig3 = createMockStreamConfig("topic3", "test-table", false); + StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", true); + StreamConfig streamConfig2 = createMockStreamConfig("topic2-nonexistent", "test-table", true); + StreamConfig streamConfig3 = createMockStreamConfig("topic3", "test-table", true); List streamConfigs = Arrays.asList(streamConfig1, streamConfig2, streamConfig3); List statusList = Collections.emptyList(); @@ -452,8 +454,8 @@ public void testFetchMultipleStreamsProceedsWhenGetTopicsUnsupported() public void testFetchMultipleStreamsTopicExistsCheckPasses() throws Exception { // Setup: All topics exist, processing should proceed normally - StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", false); - StreamConfig streamConfig2 = createMockStreamConfig("topic2", "test-table", false); + StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", true); + StreamConfig streamConfig2 = createMockStreamConfig("topic2", "test-table", true); List streamConfigs = Arrays.asList(streamConfig1, streamConfig2); List statusList = Collections.emptyList(); @@ -502,10 +504,16 @@ public void testFetchMultipleStreamsTopicExistsCheckPasses() } } - private StreamConfig createMockStreamConfig(String topicName, String tableName, boolean isEphemeral) { + private StreamConfig createMockStreamConfig(String topicName, String tableName, + boolean topicExistenceCheckEnabled) { StreamConfig streamConfig = mock(StreamConfig.class); when(streamConfig.getTopicName()).thenReturn(topicName); when(streamConfig.getTableNameWithType()).thenReturn(tableName); + Map configsMap = new HashMap<>(); + if (topicExistenceCheckEnabled) { + configsMap.put("topic.existence.check.enabled", "true"); + } + when(streamConfig.getStreamConfigsMap()).thenReturn(configsMap); return streamConfig; } } From 79bb0076a8fa2efa71119ba14c2b38d223d21082 Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Mon, 23 Feb 2026 18:41:55 +0000 Subject: [PATCH 07/16] diff review comments addressed --- .../pinot/spi/stream/PartitionGroupMetadataFetcher.java | 6 +++--- .../org/apache/pinot/spi/stream/StreamConfigProperties.java | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 740d919ba331..580e5361994f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -34,7 +34,6 @@ */ public class PartitionGroupMetadataFetcher implements Callable { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class); - private static final String TOPIC_EXISTENCE_CHECK_ENABLED = "topic.existence.check.enabled"; private final List _streamConfigs; private final List _partitionGroupConsumptionStatusList; @@ -132,7 +131,8 @@ private Boolean fetchMultipleStreams() // Check if the topic exists before fetching partition metadata // Only perform this check if topic existence validation is enabled and topics were fetched boolean checkTopicExists = Boolean.parseBoolean( - streamConfig.getStreamConfigsMap().getOrDefault(TOPIC_EXISTENCE_CHECK_ENABLED, "false")); + streamConfig.getStreamConfigsMap() + .getOrDefault(StreamConfigProperties.TOPIC_EXISTENCE_CHECK_ENABLED, "false")); if (checkTopicExists && availableTopicNames != null && !availableTopicNames.contains(topicName)) { LOGGER.warn("Topic {} does not exist. Skipping this topic from ingestion.", topicName); continue; @@ -174,7 +174,7 @@ private Set fetchAvailableTopicNames() { // Find first stream config with topic existence check enabled StreamConfig streamConfigForTopicFetch = _streamConfigs.stream() .filter(config -> Boolean.parseBoolean( - config.getStreamConfigsMap().getOrDefault(TOPIC_EXISTENCE_CHECK_ENABLED, "false"))) + config.getStreamConfigsMap().getOrDefault(StreamConfigProperties.TOPIC_EXISTENCE_CHECK_ENABLED, "false"))) .findFirst() .orElse(null); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java index 762720ab59bc..b236b68b944c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java @@ -48,6 +48,7 @@ private StreamConfigProperties() { public static final String GROUP_ID = "hlc.group.id"; public static final String PARTITION_MSG_OFFSET_FACTORY_CLASS = "partition.offset.factory.class.name"; public static final String TOPIC_CONSUMPTION_RATE_LIMIT = "topic.consumption.rate.limit"; + public static final String TOPIC_EXISTENCE_CHECK_ENABLED = "topic.existence.check.enabled"; public static final String METADATA_POPULATE = "metadata.populate"; /** From c010ab591bdbab51ecf6380888fb58d38243fdaa Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Mon, 23 Feb 2026 18:47:56 +0000 Subject: [PATCH 08/16] diff review comments addressed --- .../apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 580e5361994f..0fa893573ac2 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -158,7 +158,7 @@ private Boolean fetchMultipleStreams() } catch (Exception e) { LOGGER.warn("Could not get partition count for topic {}", topicName, e); _exception = e; - // Continue processing other streams (don't throw for multiple streams) + throw e; } } return Boolean.TRUE; From a5569558a2fd28ffe493654f799999294db72a79 Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Mon, 23 Feb 2026 18:53:08 +0000 Subject: [PATCH 09/16] diff review comments addressed --- .../PartitionGroupMetadataFetcherTest.java | 39 ++++++++----------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java index 48d565768e7f..895a1fd8ecaf 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java @@ -214,7 +214,7 @@ public void testFetchMultipleStreamsWithPause() } @Test - public void testFetchMultipleStreamsWithExceptionContinuesProcessing() + public void testFetchMultipleStreamsWithExceptionThrows() throws Exception { // Setup: 3 streams where the second one throws an exception StreamConfig streamConfig1 = createMockStreamConfig("topic1", "test-table", false); @@ -264,29 +264,22 @@ public void testFetchMultipleStreamsWithExceptionContinuesProcessing() PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( streamConfigs, statusList, Collections.emptyList(), false); - // Execute - Boolean result = fetcher.call(); - - // Verify: should return TRUE despite exception (continues processing other streams) - Assert.assertTrue(result); - - // Verify: exception should be recorded - Assert.assertNotNull(fetcher.getException()); - Assert.assertTrue(fetcher.getException() instanceof RuntimeException); - Assert.assertTrue(fetcher.getException().getMessage().contains("topic2-failing")); - - // Verify: metadata from successful streams (topic1 and topic3) should be collected - // topic1 returns partition 0, topic3 (index 2) returns partition 20000 + // Execute and verify exception is thrown + try { + fetcher.call(); + Assert.fail("Expected RuntimeException to be thrown"); + } catch (RuntimeException e) { + // Verify: exception should contain topic2-failing + Assert.assertTrue(e.getMessage().contains("topic2-failing")); + // Verify: exception should also be recorded + Assert.assertNotNull(fetcher.getException()); + Assert.assertEquals(e, fetcher.getException()); + } + + // Verify: only metadata from topic1 should be collected (before exception) List resultMetadata = fetcher.getPartitionGroupMetadataList(); - Assert.assertEquals(resultMetadata.size(), 2); - - List partitionIds = resultMetadata.stream() - .map(PartitionGroupMetadata::getPartitionGroupId) - .sorted() - .collect(Collectors.toList()); - - // Partition IDs: 0 from topic1 (index 0), 20000 from topic3 (index 2) - Assert.assertEquals(partitionIds, Arrays.asList(0, 20000)); + Assert.assertEquals(resultMetadata.size(), 1); + Assert.assertEquals(resultMetadata.get(0).getPartitionGroupId(), 0); } } From a98aaeb50dc0ef31da17a6c381765b7ad9c7b9a1 Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Tue, 24 Feb 2026 00:00:04 +0000 Subject: [PATCH 10/16] diff review comments addressed --- .../pinot/spi/stream/PartitionGroupMetadataFetcher.java | 4 ++-- .../org/apache/pinot/spi/stream/StreamConfigProperties.java | 2 +- .../pinot/spi/stream/PartitionGroupMetadataFetcherTest.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 0fa893573ac2..2c9b8db4355d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -132,7 +132,7 @@ private Boolean fetchMultipleStreams() // Only perform this check if topic existence validation is enabled and topics were fetched boolean checkTopicExists = Boolean.parseBoolean( streamConfig.getStreamConfigsMap() - .getOrDefault(StreamConfigProperties.TOPIC_EXISTENCE_CHECK_ENABLED, "false")); + .getOrDefault(StreamConfigProperties.SKIP_MISSING_TOPICS, "false")); if (checkTopicExists && availableTopicNames != null && !availableTopicNames.contains(topicName)) { LOGGER.warn("Topic {} does not exist. Skipping this topic from ingestion.", topicName); continue; @@ -174,7 +174,7 @@ private Set fetchAvailableTopicNames() { // Find first stream config with topic existence check enabled StreamConfig streamConfigForTopicFetch = _streamConfigs.stream() .filter(config -> Boolean.parseBoolean( - config.getStreamConfigsMap().getOrDefault(StreamConfigProperties.TOPIC_EXISTENCE_CHECK_ENABLED, "false"))) + config.getStreamConfigsMap().getOrDefault(StreamConfigProperties.SKIP_MISSING_TOPICS, "false"))) .findFirst() .orElse(null); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java index b236b68b944c..b2ade311992e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java @@ -48,7 +48,7 @@ private StreamConfigProperties() { public static final String GROUP_ID = "hlc.group.id"; public static final String PARTITION_MSG_OFFSET_FACTORY_CLASS = "partition.offset.factory.class.name"; public static final String TOPIC_CONSUMPTION_RATE_LIMIT = "topic.consumption.rate.limit"; - public static final String TOPIC_EXISTENCE_CHECK_ENABLED = "topic.existence.check.enabled"; + public static final String SKIP_MISSING_TOPICS = "skip.missing.topics"; public static final String METADATA_POPULATE = "metadata.populate"; /** diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java index 895a1fd8ecaf..3f6de5a78a69 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java @@ -504,7 +504,7 @@ private StreamConfig createMockStreamConfig(String topicName, String tableName, when(streamConfig.getTableNameWithType()).thenReturn(tableName); Map configsMap = new HashMap<>(); if (topicExistenceCheckEnabled) { - configsMap.put("topic.existence.check.enabled", "true"); + configsMap.put(StreamConfigProperties.SKIP_MISSING_TOPICS, "true"); } when(streamConfig.getStreamConfigsMap()).thenReturn(configsMap); return streamConfig; From 8420429ec393a34b6298860b7e164af89173d1a2 Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Tue, 24 Feb 2026 00:04:40 +0000 Subject: [PATCH 11/16] diff review comments addressed --- .../pinot/spi/stream/PartitionGroupMetadataFetcher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 2c9b8db4355d..0158dd50dc4d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -130,10 +130,10 @@ private Boolean fetchMultipleStreams() // Check if the topic exists before fetching partition metadata // Only perform this check if topic existence validation is enabled and topics were fetched - boolean checkTopicExists = Boolean.parseBoolean( + boolean skipMissingTopics = Boolean.parseBoolean( streamConfig.getStreamConfigsMap() .getOrDefault(StreamConfigProperties.SKIP_MISSING_TOPICS, "false")); - if (checkTopicExists && availableTopicNames != null && !availableTopicNames.contains(topicName)) { + if (skipMissingTopics && availableTopicNames != null && !availableTopicNames.contains(topicName)) { LOGGER.warn("Topic {} does not exist. Skipping this topic from ingestion.", topicName); continue; } From 774e9a8054ad5a5fa360249b5af51026e3bc8bf4 Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Thu, 26 Feb 2026 21:48:21 +0000 Subject: [PATCH 12/16] address diff review comments --- .../helix/SegmentStatusChecker.java | 4 +++- .../core/PinotTableIdealStateBuilder.java | 6 +++-- .../MissingConsumingSegmentFinder.java | 5 ++-- .../PinotLLCRealtimeSegmentManager.java | 18 ++++++++++++-- .../ingestion/StreamIngestionConfig.java | 11 +++++++++ .../stream/PartitionGroupMetadataFetcher.java | 24 +++++++------------ .../pinot/spi/utils/IngestionConfigUtils.java | 13 ++++++++++ .../PartitionGroupMetadataFetcherTest.java | 18 +++++++------- 8 files changed, 68 insertions(+), 31 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index ab20f6fb7453..59523f1e432b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -474,8 +474,10 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon if (tableType == TableType.REALTIME && tableConfig != null) { List streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig); + boolean skipMissingTopics = IngestionConfigUtils.getSkipMissingTopics(tableConfig); + new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics, - streamConfigs, idealState).findAndEmitMetrics(idealState); + streamConfigs, idealState, skipMissingTopics).findAndEmitMetrics(idealState); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 6ec48830ca71..67fba04182b2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -89,12 +89,14 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n * and is created using the latest segment zk metadata. * @param pausedTopicIndices List of inactive topic indices. Index is the index of the topic in the streamConfigMaps. * @param forceGetOffsetFromStream - details in PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset + * @param skipMissingTopics whether to skip topics that don't exist during partition metadata fetch */ public static List getPartitionGroupMetadataList(List streamConfigs, List partitionGroupConsumptionStatusList, List pausedTopicIndices, - boolean forceGetOffsetFromStream) { + boolean forceGetOffsetFromStream, boolean skipMissingTopics) { PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = new PartitionGroupMetadataFetcher( - streamConfigs, partitionGroupConsumptionStatusList, pausedTopicIndices, forceGetOffsetFromStream); + streamConfigs, partitionGroupConsumptionStatusList, pausedTopicIndices, forceGetOffsetFromStream, + skipMissingTopics); try { DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index 99bee6f8a7fe..ea74a1ca3e3b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -69,7 +69,8 @@ public class MissingConsumingSegmentFinder { private ControllerMetrics _controllerMetrics; public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore propertyStore, - ControllerMetrics controllerMetrics, List streamConfigs, IdealState idealState) { + ControllerMetrics controllerMetrics, List streamConfigs, IdealState idealState, + boolean skipMissingTopics) { _realtimeTableName = realtimeTableName; _controllerMetrics = controllerMetrics; _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics); @@ -85,7 +86,7 @@ public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertySt try { PauseState pauseState = PinotLLCRealtimeSegmentManager.extractTablePauseState(idealState); PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), - pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), false) + pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), false, skipMissingTopics) .forEach(metadata -> { _partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); }); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 55252182dd6a..5e2b39a31fea 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1201,7 +1201,7 @@ Set getPartitionIds(List streamConfigs, IdealState idealS List getNewPartitionGroupMetadataList(List streamConfigs, List currentPartitionGroupConsumptionStatusList, IdealState idealState) { return getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, - false); + false, false); } /** @@ -1213,10 +1213,24 @@ List getNewPartitionGroupMetadataList(List List getNewPartitionGroupMetadataList(List streamConfigs, List currentPartitionGroupConsumptionStatusList, IdealState idealState, boolean forceGetOffsetFromStream) { + return getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, + forceGetOffsetFromStream, false); + } + + /** + * Fetches the latest state of the PartitionGroups for the stream + * If any partition has reached end of life, and all messages of that partition have been consumed by the segment, + * it will be skipped from the result + */ + @VisibleForTesting + List getNewPartitionGroupMetadataList(List streamConfigs, + List currentPartitionGroupConsumptionStatusList, IdealState idealState, + boolean forceGetOffsetFromStream, boolean skipMissingTopics) { PauseState pauseState = extractTablePauseState(idealState); return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, - pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), forceGetOffsetFromStream); + pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), forceGetOffsetFromStream, + skipMissingTopics); } /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java index dda082c80d29..69f4f84d959a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java @@ -62,6 +62,9 @@ public class StreamIngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Class to handle realtime offset auto reset") private String _realtimeOffsetAutoResetHandlerClass; + @JsonPropertyDescription("Whether to skip topics that don't exist during partition metadata fetch") + private boolean _skipMissingTopics; + @JsonCreator public StreamIngestionConfig(@JsonProperty("streamConfigMaps") List> streamConfigMaps) { _streamConfigMaps = streamConfigMaps; @@ -136,4 +139,12 @@ public String getRealtimeOffsetAutoResetHandlerClass() { public void setRealtimeOffsetAutoResetHandlerClass(String realtimeOffsetAutoResetHandlerClass) { _realtimeOffsetAutoResetHandlerClass = realtimeOffsetAutoResetHandlerClass; } + + public boolean isSkipMissingTopics() { + return _skipMissingTopics; + } + + public void setSkipMissingTopics(boolean skipMissingTopics) { + _skipMissingTopics = skipMissingTopics; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 0158dd50dc4d..6e480dadaba2 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -38,6 +38,7 @@ public class PartitionGroupMetadataFetcher implements Callable { private final List _streamConfigs; private final List _partitionGroupConsumptionStatusList; private final boolean _forceGetOffsetFromStream; + private final boolean _skipMissingTopics; private final List _newPartitionGroupMetadataList = new ArrayList<>(); private final List _pausedTopicIndices; @@ -45,10 +46,11 @@ public class PartitionGroupMetadataFetcher implements Callable { public PartitionGroupMetadataFetcher(List streamConfigs, List partitionGroupConsumptionStatusList, List pausedTopicIndices, - boolean forceGetOffsetFromStream) { + boolean forceGetOffsetFromStream, boolean skipMissingTopics) { _streamConfigs = streamConfigs; _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList; _forceGetOffsetFromStream = forceGetOffsetFromStream; + _skipMissingTopics = skipMissingTopics; _pausedTopicIndices = pausedTopicIndices; } @@ -130,10 +132,7 @@ private Boolean fetchMultipleStreams() // Check if the topic exists before fetching partition metadata // Only perform this check if topic existence validation is enabled and topics were fetched - boolean skipMissingTopics = Boolean.parseBoolean( - streamConfig.getStreamConfigsMap() - .getOrDefault(StreamConfigProperties.SKIP_MISSING_TOPICS, "false")); - if (skipMissingTopics && availableTopicNames != null && !availableTopicNames.contains(topicName)) { + if (_skipMissingTopics && availableTopicNames != null && !availableTopicNames.contains(topicName)) { LOGGER.warn("Topic {} does not exist. Skipping this topic from ingestion.", topicName); continue; } @@ -166,22 +165,17 @@ private Boolean fetchMultipleStreams() /** * Fetches available topic names from the stream provider. - * Uses the first stream config that has topic existence check enabled. + * Uses the first stream config to fetch topics. * - * @return Set of available topic names, or null if topics could not be fetched + * @return Set of available topic names, or null if topics could not be fetched or skip missing topics is disabled */ private Set fetchAvailableTopicNames() { - // Find first stream config with topic existence check enabled - StreamConfig streamConfigForTopicFetch = _streamConfigs.stream() - .filter(config -> Boolean.parseBoolean( - config.getStreamConfigsMap().getOrDefault(StreamConfigProperties.SKIP_MISSING_TOPICS, "false"))) - .findFirst() - .orElse(null); - - if (streamConfigForTopicFetch == null) { + if (!_skipMissingTopics || _streamConfigs.isEmpty()) { return null; } + StreamConfig streamConfigForTopicFetch = _streamConfigs.get(0); + String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-topicFetch-" + streamConfigForTopicFetch.getTableNameWithType(); StreamConsumerFactory factory = StreamConsumerFactoryProvider.create(streamConfigForTopicFetch); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index 40f647c429cd..71d6a9c203bd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -351,4 +351,17 @@ public static Map> getStreamConfigIndexToStreamPartitions( } return streamIndexToPartitions; } + + /** + * Returns whether to skip missing topics during partition metadata fetch. + * @param tableConfig the table config + * @return true if missing topics should be skipped, false otherwise + */ + public static boolean getSkipMissingTopics(TableConfig tableConfig) { + IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); + if (ingestionConfig != null && ingestionConfig.getStreamIngestionConfig() != null) { + return ingestionConfig.getStreamIngestionConfig().isSkipMissingTopics(); + } + return false; + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java index 3f6de5a78a69..6154e7291656 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java @@ -65,7 +65,7 @@ public void testFetchSingleStreamSuccess() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList(), false); + streamConfigs, statusList, Collections.emptyList(), false, false); // Execute Boolean result = fetcher.call(); @@ -99,7 +99,7 @@ public void testFetchSingleStreamTransientException() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList(), false); + streamConfigs, statusList, Collections.emptyList(), false, false); // Execute Boolean result = fetcher.call(); @@ -140,7 +140,7 @@ public void testFetchMultipleStreams() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList(), false); + streamConfigs, statusList, Collections.emptyList(), false, false); // Execute Boolean result = fetcher.call(); @@ -192,7 +192,7 @@ public void testFetchMultipleStreamsWithPause() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Arrays.asList(1), false); + streamConfigs, statusList, Arrays.asList(1), false, false); // Execute Boolean result = fetcher.call(); @@ -262,7 +262,7 @@ public void testFetchMultipleStreamsWithExceptionThrows() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig3)).thenReturn(factory3); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList(), false); + streamConfigs, statusList, Collections.emptyList(), false, false); // Execute and verify exception is thrown try { @@ -307,7 +307,7 @@ public void testFetchMultipleStreamsTransientExceptionStopsProcessing() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList(), false); + streamConfigs, statusList, Collections.emptyList(), false, false); // Execute Boolean result = fetcher.call(); @@ -371,7 +371,7 @@ public void testFetchMultipleStreamsSkipsNonExistentTopic() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(streamConfig3)).thenReturn(factory3); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList(), false); + streamConfigs, statusList, Collections.emptyList(), false, false); // Execute Boolean result = fetcher.call(); @@ -421,7 +421,7 @@ public void testFetchMultipleStreamsProceedsWhenGetTopicsUnsupported() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList(), false); + streamConfigs, statusList, Collections.emptyList(), false, false); // Execute Boolean result = fetcher.call(); @@ -475,7 +475,7 @@ public void testFetchMultipleStreamsTopicExistsCheckPasses() mockedProvider.when(() -> StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory); PartitionGroupMetadataFetcher fetcher = new PartitionGroupMetadataFetcher( - streamConfigs, statusList, Collections.emptyList(), false); + streamConfigs, statusList, Collections.emptyList(), false, false); // Execute Boolean result = fetcher.call(); From 44b6575f624cfa4112757f8c04ac0a1dacd864c5 Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Thu, 26 Feb 2026 22:01:26 +0000 Subject: [PATCH 13/16] minor update --- .../pinot/controller/helix/SegmentStatusChecker.java | 2 +- .../table/ingestion/StreamIngestionConfig.java | 12 ++++++------ .../apache/pinot/spi/utils/IngestionConfigUtils.java | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 59523f1e432b..0dbd155c705d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -474,7 +474,7 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon if (tableType == TableType.REALTIME && tableConfig != null) { List streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig); - boolean skipMissingTopics = IngestionConfigUtils.getSkipMissingTopics(tableConfig); + boolean skipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTables(tableConfig); new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics, streamConfigs, idealState, skipMissingTopics).findAndEmitMetrics(idealState); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java index 69f4f84d959a..9405d2d104dc 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java @@ -62,8 +62,8 @@ public class StreamIngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Class to handle realtime offset auto reset") private String _realtimeOffsetAutoResetHandlerClass; - @JsonPropertyDescription("Whether to skip topics that don't exist during partition metadata fetch") - private boolean _skipMissingTopics; + @JsonPropertyDescription("Multitopic Tables : If true, non-existent topics will be skipped instead of causing failures. ") + private boolean _multitopicSkipMissingTables; @JsonCreator public StreamIngestionConfig(@JsonProperty("streamConfigMaps") List> streamConfigMaps) { @@ -140,11 +140,11 @@ public void setRealtimeOffsetAutoResetHandlerClass(String realtimeOffsetAutoRese _realtimeOffsetAutoResetHandlerClass = realtimeOffsetAutoResetHandlerClass; } - public boolean isSkipMissingTopics() { - return _skipMissingTopics; + public boolean isMultitopicSkipMissingTables() { + return _multitopicSkipMissingTables; } - public void setSkipMissingTopics(boolean skipMissingTopics) { - _skipMissingTopics = skipMissingTopics; + public void setMultitopicSkipMissingTables(boolean multitopicSkipMissingTables) { + _multitopicSkipMissingTables = multitopicSkipMissingTables; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index 71d6a9c203bd..37bf7d9c584a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -357,10 +357,10 @@ public static Map> getStreamConfigIndexToStreamPartitions( * @param tableConfig the table config * @return true if missing topics should be skipped, false otherwise */ - public static boolean getSkipMissingTopics(TableConfig tableConfig) { + public static boolean getMultitopicSkipMissingTables(TableConfig tableConfig) { IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); if (ingestionConfig != null && ingestionConfig.getStreamIngestionConfig() != null) { - return ingestionConfig.getStreamIngestionConfig().isSkipMissingTopics(); + return ingestionConfig.getStreamIngestionConfig().isMultitopicSkipMissingTables(); } return false; } From 23ba19af3f83a7b89aba8e42305cebc17847a545 Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Thu, 26 Feb 2026 22:08:07 +0000 Subject: [PATCH 14/16] minor update --- .../pinot/controller/helix/SegmentStatusChecker.java | 4 ++-- .../helix/core/PinotTableIdealStateBuilder.java | 6 +++--- .../core/realtime/MissingConsumingSegmentFinder.java | 4 ++-- .../core/realtime/PinotLLCRealtimeSegmentManager.java | 4 ++-- .../config/table/ingestion/StreamIngestionConfig.java | 3 ++- .../spi/stream/PartitionGroupMetadataFetcher.java | 10 +++++----- 6 files changed, 16 insertions(+), 15 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 0dbd155c705d..ce5ace3325e4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -474,10 +474,10 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon if (tableType == TableType.REALTIME && tableConfig != null) { List streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig); - boolean skipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTables(tableConfig); + boolean multitopicSkipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTables(tableConfig); new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics, - streamConfigs, idealState, skipMissingTopics).findAndEmitMetrics(idealState); + streamConfigs, idealState, multitopicSkipMissingTopics).findAndEmitMetrics(idealState); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 67fba04182b2..8a118805405d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -89,14 +89,14 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n * and is created using the latest segment zk metadata. * @param pausedTopicIndices List of inactive topic indices. Index is the index of the topic in the streamConfigMaps. * @param forceGetOffsetFromStream - details in PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset - * @param skipMissingTopics whether to skip topics that don't exist during partition metadata fetch + * @param multitopicSkipMissingTopics whether to skip topics that don't exist during partition metadata fetch */ public static List getPartitionGroupMetadataList(List streamConfigs, List partitionGroupConsumptionStatusList, List pausedTopicIndices, - boolean forceGetOffsetFromStream, boolean skipMissingTopics) { + boolean forceGetOffsetFromStream, boolean multitopicSkipMissingTopics) { PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = new PartitionGroupMetadataFetcher( streamConfigs, partitionGroupConsumptionStatusList, pausedTopicIndices, forceGetOffsetFromStream, - skipMissingTopics); + multitopicSkipMissingTopics); try { DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index ea74a1ca3e3b..d7072ca884c8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -70,7 +70,7 @@ public class MissingConsumingSegmentFinder { public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore propertyStore, ControllerMetrics controllerMetrics, List streamConfigs, IdealState idealState, - boolean skipMissingTopics) { + boolean multitopicSkipMissingTopics) { _realtimeTableName = realtimeTableName; _controllerMetrics = controllerMetrics; _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics); @@ -86,7 +86,7 @@ public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertySt try { PauseState pauseState = PinotLLCRealtimeSegmentManager.extractTablePauseState(idealState); PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), - pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), false, skipMissingTopics) + pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), false, multitopicSkipMissingTopics) .forEach(metadata -> { _partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); }); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 5e2b39a31fea..73c659fe22c5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -1225,12 +1225,12 @@ List getNewPartitionGroupMetadataList(List @VisibleForTesting List getNewPartitionGroupMetadataList(List streamConfigs, List currentPartitionGroupConsumptionStatusList, IdealState idealState, - boolean forceGetOffsetFromStream, boolean skipMissingTopics) { + boolean forceGetOffsetFromStream, boolean multitopicSkipMissingTopics) { PauseState pauseState = extractTablePauseState(idealState); return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), forceGetOffsetFromStream, - skipMissingTopics); + multitopicSkipMissingTopics); } /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java index 9405d2d104dc..334a055d213f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java @@ -62,7 +62,8 @@ public class StreamIngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Class to handle realtime offset auto reset") private String _realtimeOffsetAutoResetHandlerClass; - @JsonPropertyDescription("Multitopic Tables : If true, non-existent topics will be skipped instead of causing failures. ") + @JsonPropertyDescription("Multitopic Tables: If true, non-existent topics will be skipped instead of " + + "causing failures.") private boolean _multitopicSkipMissingTables; @JsonCreator diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 6e480dadaba2..ac2d4bf343d8 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -38,7 +38,7 @@ public class PartitionGroupMetadataFetcher implements Callable { private final List _streamConfigs; private final List _partitionGroupConsumptionStatusList; private final boolean _forceGetOffsetFromStream; - private final boolean _skipMissingTopics; + private final boolean _multitopicSkipMissingTopics; private final List _newPartitionGroupMetadataList = new ArrayList<>(); private final List _pausedTopicIndices; @@ -46,11 +46,11 @@ public class PartitionGroupMetadataFetcher implements Callable { public PartitionGroupMetadataFetcher(List streamConfigs, List partitionGroupConsumptionStatusList, List pausedTopicIndices, - boolean forceGetOffsetFromStream, boolean skipMissingTopics) { + boolean forceGetOffsetFromStream, boolean multitopicSkipMissingTopics) { _streamConfigs = streamConfigs; _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList; _forceGetOffsetFromStream = forceGetOffsetFromStream; - _skipMissingTopics = skipMissingTopics; + _multitopicSkipMissingTopics = multitopicSkipMissingTopics; _pausedTopicIndices = pausedTopicIndices; } @@ -132,7 +132,7 @@ private Boolean fetchMultipleStreams() // Check if the topic exists before fetching partition metadata // Only perform this check if topic existence validation is enabled and topics were fetched - if (_skipMissingTopics && availableTopicNames != null && !availableTopicNames.contains(topicName)) { + if (_multitopicSkipMissingTopics && availableTopicNames != null && !availableTopicNames.contains(topicName)) { LOGGER.warn("Topic {} does not exist. Skipping this topic from ingestion.", topicName); continue; } @@ -170,7 +170,7 @@ private Boolean fetchMultipleStreams() * @return Set of available topic names, or null if topics could not be fetched or skip missing topics is disabled */ private Set fetchAvailableTopicNames() { - if (!_skipMissingTopics || _streamConfigs.isEmpty()) { + if (!_multitopicSkipMissingTopics || _streamConfigs.isEmpty()) { return null; } From c2ba4eb1ad4f40fa15d89db9c35406f0dba25c7c Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Fri, 27 Feb 2026 00:01:55 +0000 Subject: [PATCH 15/16] minor update --- .../helix/SegmentStatusChecker.java | 2 +- .../PinotLLCRealtimeSegmentManager.java | 42 +++++++++---------- .../PinotLLCRealtimeSegmentManagerTest.java | 14 ++++--- .../spi/stream/StreamConfigProperties.java | 2 +- .../pinot/spi/utils/IngestionConfigUtils.java | 2 +- .../PartitionGroupMetadataFetcherTest.java | 2 +- 6 files changed, 31 insertions(+), 33 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index ce5ace3325e4..96fca237aac1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -474,7 +474,7 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon if (tableType == TableType.REALTIME && tableConfig != null) { List streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig); - boolean multitopicSkipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTables(tableConfig); + boolean multitopicSkipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig); new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics, streamConfigs, idealState, multitopicSkipMissingTopics).findAndEmitMetrics(idealState); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 73c659fe22c5..7991540cd701 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -378,8 +378,10 @@ public void stop() { */ public void setUpNewTable(TableConfig tableConfig, IdealState idealState) { List streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig); + boolean multitopicSkipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig); List> newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), idealState).stream().map( + getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), idealState, false, + multitopicSkipMissingTopics).stream().map( x -> Pair.of(x, STARTING_SEQUENCE_NUMBER) ).collect(Collectors.toList()); setUpNewTable(tableConfig, idealState, newPartitionGroupMetadataList); @@ -811,7 +813,8 @@ private String createNewSegmentMetadata(TableConfig tableConfig, IdealState idea int committingSegmentPartitionGroupId = committingLLCSegment.getPartitionGroupId(); List streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig); - Set partitionIds = getPartitionIds(streamConfigs, idealState); + boolean multitopicSkipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig); + Set partitionIds = getPartitionIds(streamConfigs, idealState, multitopicSkipMissingTopics); if (partitionIds.contains(committingSegmentPartitionGroupId)) { String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); @@ -1136,7 +1139,7 @@ Set getPartitionIds(StreamConfig streamConfig) } @VisibleForTesting - Set getPartitionIds(List streamConfigs, IdealState idealState) { + Set getPartitionIds(List streamConfigs, IdealState idealState, boolean multitopicSkipMissingTopics) { Set partitionIds = new HashSet<>(); boolean allPartitionIdsFetched = true; int numStreams = streamConfigs.size(); @@ -1184,7 +1187,8 @@ Set getPartitionIds(List streamConfigs, IdealState idealS List currentPartitionGroupConsumptionStatusList = getPartitionGroupConsumptionStatusList(idealState, streamConfigs); List newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState); + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, + false, multitopicSkipMissingTopics); partitionIds.addAll(newPartitionGroupMetadataList.stream() .map(PartitionGroupMetadata::getPartitionGroupId) .collect(Collectors.toSet())); @@ -1192,18 +1196,6 @@ Set getPartitionIds(List streamConfigs, IdealState idealS return partitionIds; } - /** - * Fetches the latest state of the PartitionGroups for the stream - * If any partition has reached end of life, and all messages of that partition have been consumed by the segment, - * it will be skipped from the result - */ - @VisibleForTesting - List getNewPartitionGroupMetadataList(List streamConfigs, - List currentPartitionGroupConsumptionStatusList, IdealState idealState) { - return getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, - false, false); - } - /** * Fetches the latest state of the PartitionGroups for the stream * If any partition has reached end of life, and all messages of that partition have been consumed by the segment, @@ -1212,9 +1204,9 @@ List getNewPartitionGroupMetadataList(List @VisibleForTesting List getNewPartitionGroupMetadataList(List streamConfigs, List currentPartitionGroupConsumptionStatusList, IdealState idealState, - boolean forceGetOffsetFromStream) { + boolean multitopicSkipMissingTopics) { return getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, - forceGetOffsetFromStream, false); + false, multitopicSkipMissingTopics); } /** @@ -1402,8 +1394,11 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, List streamConfig.setOffsetCriteria( offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA)); + boolean multitopicSkipMissingTopics = + IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig); List newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState); + getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, + false, multitopicSkipMissingTopics); streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(originalOffsetCriteria)); return ensureAllPartitionsConsuming(tableConfig, streamConfigs, idealState, newPartitionGroupMetadataList, offsetCriteria); @@ -1768,10 +1763,11 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List fetchPartitionGroupIdToSmallestOffset(List streamConfigs, - IdealState idealState) { + IdealState idealState, boolean multitopicSkipMissingTopics) { Map partitionGroupIdToSmallestOffset = new HashMap<>(); for (StreamConfig streamConfig : streamConfigs) { List currentPartitionGroupConsumptionStatusList = @@ -1889,7 +1885,7 @@ private Map fetchPartitionGroupIdToSmallestOf // The kafka implementation of computePartitionGroupMetadata() will ignore the current status // while the kinesis implementation will use it. List partitionGroupMetadataList = getNewPartitionGroupMetadataList( - streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, true); + streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, true, multitopicSkipMissingTopics); streamConfig.setOffsetCriteria(originalOffsetCriteria); for (PartitionGroupMetadata metadata : partitionGroupMetadataList) { partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 94f3abba16e9..5cdc1875f8e4 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -1622,7 +1622,7 @@ public void testGetPartitionIds() segmentManager._numPartitions = 2; // Test empty ideal state - Set partitionIds = segmentManager.getPartitionIds(streamConfigs, idealState); + Set partitionIds = segmentManager.getPartitionIds(streamConfigs, idealState, null); Assert.assertEquals(partitionIds.size(), 2); partitionIds.clear(); @@ -1638,8 +1638,8 @@ public void testGetPartitionIds() List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)), new PartitionGroupMetadata(1, new LongMsgOffset(345))); doReturn(partitionGroupMetadataList).when(segmentManagerSpy) - .getNewPartitionGroupMetadataList(streamConfigs, partitionGroupConsumptionStatusList, idealState); - partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, idealState); + .getNewPartitionGroupMetadataList(streamConfigs, partitionGroupConsumptionStatusList, idealState, false, false); + partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, idealState, null); Assert.assertEquals(partitionIds.size(), 2); } @@ -2106,7 +2106,8 @@ Set getPartitionIds(StreamConfig streamConfig) { @Override List getNewPartitionGroupMetadataList(List streamConfigs, - List currentPartitionGroupConsumptionStatusList, IdealState idealState) { + List currentPartitionGroupConsumptionStatusList, IdealState idealState, + boolean multitopicSkipMissingTopics) { if (_partitionGroupMetadataList != null) { return _partitionGroupMetadataList; } else { @@ -2118,8 +2119,9 @@ List getNewPartitionGroupMetadataList(List @Override List getNewPartitionGroupMetadataList(List streamConfigs, List currentPartitionGroupConsumptionStatusList, IdealState idealState, - boolean forceGetOffsetFromStream) { - return getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState); + boolean forceGetOffsetFromStream, boolean multitopicSkipMissingTopics) { + return getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, + multitopicSkipMissingTopics); } @Override diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java index b2ade311992e..f823a62dc4d3 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java @@ -48,7 +48,7 @@ private StreamConfigProperties() { public static final String GROUP_ID = "hlc.group.id"; public static final String PARTITION_MSG_OFFSET_FACTORY_CLASS = "partition.offset.factory.class.name"; public static final String TOPIC_CONSUMPTION_RATE_LIMIT = "topic.consumption.rate.limit"; - public static final String SKIP_MISSING_TOPICS = "skip.missing.topics"; + public static final String MULTITOPIC_SKIP_MISSING_TOPICS = "multitopic.skip.missing.topics"; public static final String METADATA_POPULATE = "metadata.populate"; /** diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index 37bf7d9c584a..f4f45144a94d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -357,7 +357,7 @@ public static Map> getStreamConfigIndexToStreamPartitions( * @param tableConfig the table config * @return true if missing topics should be skipped, false otherwise */ - public static boolean getMultitopicSkipMissingTables(TableConfig tableConfig) { + public static boolean getMultitopicSkipMissingTablesFlag(TableConfig tableConfig) { IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); if (ingestionConfig != null && ingestionConfig.getStreamIngestionConfig() != null) { return ingestionConfig.getStreamIngestionConfig().isMultitopicSkipMissingTables(); diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java index 6154e7291656..8ac9e8c85f0c 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java @@ -504,7 +504,7 @@ private StreamConfig createMockStreamConfig(String topicName, String tableName, when(streamConfig.getTableNameWithType()).thenReturn(tableName); Map configsMap = new HashMap<>(); if (topicExistenceCheckEnabled) { - configsMap.put(StreamConfigProperties.SKIP_MISSING_TOPICS, "true"); + configsMap.put(StreamConfigProperties.MULTITOPIC_SKIP_MISSING_TOPICS, "true"); } when(streamConfig.getStreamConfigsMap()).thenReturn(configsMap); return streamConfig; From d0be80c4572bd096902ae73f1b33066a55c7060a Mon Sep 17 00:00:00 2001 From: "abhishek.gupta" Date: Fri, 27 Feb 2026 01:55:00 +0000 Subject: [PATCH 16/16] address diff review comments --- .../helix/core/PinotTableIdealStateBuilder.java | 3 ++- .../realtime/MissingConsumingSegmentFinder.java | 3 ++- .../realtime/PinotLLCRealtimeSegmentManager.java | 9 +++++---- .../PinotLLCRealtimeSegmentManagerTest.java | 13 +++++++------ .../spi/stream/PartitionGroupMetadataFetcher.java | 3 ++- 5 files changed, 18 insertions(+), 13 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 8a118805405d..cb0ca4b91ff4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -89,7 +89,8 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n * and is created using the latest segment zk metadata. * @param pausedTopicIndices List of inactive topic indices. Index is the index of the topic in the streamConfigMaps. * @param forceGetOffsetFromStream - details in PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset - * @param multitopicSkipMissingTopics whether to skip topics that don't exist during partition metadata fetch + * @param multitopicSkipMissingTopics - In multitopic tables whether to skip topics that don't exist + * during partition metadata fetch */ public static List getPartitionGroupMetadataList(List streamConfigs, List partitionGroupConsumptionStatusList, List pausedTopicIndices, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index d7072ca884c8..705eb7f05367 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -86,7 +86,8 @@ public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertySt try { PauseState pauseState = PinotLLCRealtimeSegmentManager.extractTablePauseState(idealState); PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), - pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), false, multitopicSkipMissingTopics) + pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), false, + multitopicSkipMissingTopics) .forEach(metadata -> { _partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); }); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 7991540cd701..a3bc1683db72 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -380,7 +380,7 @@ public void setUpNewTable(TableConfig tableConfig, IdealState idealState) { List streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig); boolean multitopicSkipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig); List> newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), idealState, false, + getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), idealState, multitopicSkipMissingTopics).stream().map( x -> Pair.of(x, STARTING_SEQUENCE_NUMBER) ).collect(Collectors.toList()); @@ -1139,7 +1139,8 @@ Set getPartitionIds(StreamConfig streamConfig) } @VisibleForTesting - Set getPartitionIds(List streamConfigs, IdealState idealState, boolean multitopicSkipMissingTopics) { + Set getPartitionIds(List streamConfigs, IdealState idealState, + boolean multitopicSkipMissingTopics) { Set partitionIds = new HashSet<>(); boolean allPartitionIdsFetched = true; int numStreams = streamConfigs.size(); @@ -1188,7 +1189,7 @@ Set getPartitionIds(List streamConfigs, IdealState idealS getPartitionGroupConsumptionStatusList(idealState, streamConfigs); List newPartitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, - false, multitopicSkipMissingTopics); + multitopicSkipMissingTopics); partitionIds.addAll(newPartitionGroupMetadataList.stream() .map(PartitionGroupMetadata::getPartitionGroupId) .collect(Collectors.toSet())); @@ -1398,7 +1399,7 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, List newPartitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, - false, multitopicSkipMissingTopics); + multitopicSkipMissingTopics); streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(originalOffsetCriteria)); return ensureAllPartitionsConsuming(tableConfig, streamConfigs, idealState, newPartitionGroupMetadataList, offsetCriteria); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 5cdc1875f8e4..90cfe228a17a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -299,7 +299,7 @@ public void testCommitSegment() { // committing segment's partitionGroupId no longer in the newPartitionGroupMetadataList List partitionGroupMetadataListWithout0 = segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList(), - mock(IdealState.class)); + mock(IdealState.class), false); partitionGroupMetadataListWithout0.remove(0); segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout0; @@ -853,7 +853,7 @@ public void testRepairs() { // 1 reached end of shard. List partitionGroupMetadataListWithout1 = segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList(), - mock(IdealState.class)); + mock(IdealState.class), false); partitionGroupMetadataListWithout1.remove(1); segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout1; // noop @@ -1622,7 +1622,7 @@ public void testGetPartitionIds() segmentManager._numPartitions = 2; // Test empty ideal state - Set partitionIds = segmentManager.getPartitionIds(streamConfigs, idealState, null); + Set partitionIds = segmentManager.getPartitionIds(streamConfigs, idealState, false); Assert.assertEquals(partitionIds.size(), 2); partitionIds.clear(); @@ -1638,8 +1638,8 @@ public void testGetPartitionIds() List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)), new PartitionGroupMetadata(1, new LongMsgOffset(345))); doReturn(partitionGroupMetadataList).when(segmentManagerSpy) - .getNewPartitionGroupMetadataList(streamConfigs, partitionGroupConsumptionStatusList, idealState, false, false); - partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, idealState, null); + .getNewPartitionGroupMetadataList(streamConfigs, partitionGroupConsumptionStatusList, idealState, false); + partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, idealState, false); Assert.assertEquals(partitionIds.size(), 2); } @@ -2024,7 +2024,8 @@ public void setUpNewTable() { public void ensureAllPartitionsConsuming() { ensureAllPartitionsConsuming(_tableConfig, _streamConfigs, _idealState, - getNewPartitionGroupMetadataList(_streamConfigs, Collections.emptyList(), mock(IdealState.class)), null); + getNewPartitionGroupMetadataList(_streamConfigs, Collections.emptyList(), mock(IdealState.class), false), + null); } @Override diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index ac2d4bf343d8..b502510a26c5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -106,7 +106,8 @@ private Boolean fetchMultipleStreams() throws Exception { int numStreams = _streamConfigs.size(); - // Fetch available topics once and reuse across all streams (for topic existence validation) + // For multi topic tables - Fetch available topics once and reuse across all streams + // (for topic existence validation) Set availableTopicNames = fetchAvailableTopicNames(); for (int i = 0; i < numStreams; i++) {