Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,10 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon

if (tableType == TableType.REALTIME && tableConfig != null) {
List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig);
boolean multitopicSkipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig);

new MissingConsumingSegmentFinder(tableNameWithType, propertyStore, _controllerMetrics,
streamConfigs, idealState).findAndEmitMetrics(idealState);
streamConfigs, idealState, multitopicSkipMissingTopics).findAndEmitMetrics(idealState);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,15 @@ public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int n
* and is created using the latest segment zk metadata.
* @param pausedTopicIndices List of inactive topic indices. Index is the index of the topic in the streamConfigMaps.
* @param forceGetOffsetFromStream - details in PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset
* @param multitopicSkipMissingTopics - In multitopic tables whether to skip topics that don't exist
* during partition metadata fetch
*/
public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList, List<Integer> pausedTopicIndices,
boolean forceGetOffsetFromStream) {
boolean forceGetOffsetFromStream, boolean multitopicSkipMissingTopics) {
PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = new PartitionGroupMetadataFetcher(
streamConfigs, partitionGroupConsumptionStatusList, pausedTopicIndices, forceGetOffsetFromStream);
streamConfigs, partitionGroupConsumptionStatusList, pausedTopicIndices, forceGetOffsetFromStream,
multitopicSkipMissingTopics);
try {
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public class MissingConsumingSegmentFinder {
private ControllerMetrics _controllerMetrics;

public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore<ZNRecord> propertyStore,
ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs, IdealState idealState) {
ControllerMetrics controllerMetrics, List<StreamConfig> streamConfigs, IdealState idealState,
boolean multitopicSkipMissingTopics) {
_realtimeTableName = realtimeTableName;
_controllerMetrics = controllerMetrics;
_segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics);
Expand All @@ -85,7 +86,8 @@ public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertySt
try {
PauseState pauseState = PinotLLCRealtimeSegmentManager.extractTablePauseState(idealState);
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList(),
pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), false)
pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), false,
multitopicSkipMissingTopics)
.forEach(metadata -> {
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,10 @@ public void stop() {
*/
public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig);
boolean multitopicSkipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig);
List<Pair<PartitionGroupMetadata, Integer>> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), idealState).stream().map(
getNewPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), idealState,
multitopicSkipMissingTopics).stream().map(
x -> Pair.of(x, STARTING_SEQUENCE_NUMBER)
).collect(Collectors.toList());
setUpNewTable(tableConfig, idealState, newPartitionGroupMetadataList);
Expand Down Expand Up @@ -811,7 +813,8 @@ private String createNewSegmentMetadata(TableConfig tableConfig, IdealState idea
int committingSegmentPartitionGroupId = committingLLCSegment.getPartitionGroupId();

List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig);
Set<Integer> partitionIds = getPartitionIds(streamConfigs, idealState);
boolean multitopicSkipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig);
Set<Integer> partitionIds = getPartitionIds(streamConfigs, idealState, multitopicSkipMissingTopics);

if (partitionIds.contains(committingSegmentPartitionGroupId)) {
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
Expand Down Expand Up @@ -1136,7 +1139,8 @@ Set<Integer> getPartitionIds(StreamConfig streamConfig)
}

@VisibleForTesting
Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState idealState) {
Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState idealState,
boolean multitopicSkipMissingTopics) {
Set<Integer> partitionIds = new HashSet<>();
boolean allPartitionIdsFetched = true;
int numStreams = streamConfigs.size();
Expand Down Expand Up @@ -1184,7 +1188,8 @@ Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState idealS
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState);
getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState,
multitopicSkipMissingTopics);
partitionIds.addAll(newPartitionGroupMetadataList.stream()
.map(PartitionGroupMetadata::getPartitionGroupId)
.collect(Collectors.toSet()));
Expand All @@ -1199,9 +1204,10 @@ Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState idealS
*/
@VisibleForTesting
List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList, IdealState idealState) {
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList, IdealState idealState,
boolean multitopicSkipMissingTopics) {
return getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState,
false);
false, multitopicSkipMissingTopics);
}

/**
Expand All @@ -1212,11 +1218,12 @@ List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(List<StreamConfig>
@VisibleForTesting
List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList, IdealState idealState,
boolean forceGetOffsetFromStream) {
boolean forceGetOffsetFromStream, boolean multitopicSkipMissingTopics) {
PauseState pauseState = extractTablePauseState(idealState);
return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList,
pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), forceGetOffsetFromStream);
pauseState == null ? new ArrayList<>() : pauseState.getIndexOfInactiveTopics(), forceGetOffsetFromStream,
multitopicSkipMissingTopics);
}

/**
Expand Down Expand Up @@ -1388,8 +1395,11 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, List<StreamCon
streamConfigs.stream()
.forEach(streamConfig -> streamConfig.setOffsetCriteria(
offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
boolean multitopicSkipMissingTopics =
IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState);
getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState,
multitopicSkipMissingTopics);
streamConfigs.stream().forEach(streamConfig -> streamConfig.setOffsetCriteria(originalOffsetCriteria));
return ensureAllPartitionsConsuming(tableConfig, streamConfigs, idealState, newPartitionGroupMetadataList,
offsetCriteria);
Expand Down Expand Up @@ -1754,10 +1764,11 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, List<StreamConf
latestSegmentName, latestSegmentZKMetadata.getStatus());
continue;
}

boolean skipMissingTopics = IngestionConfigUtils.getMultitopicSkipMissingTablesFlag(tableConfig);
// Smallest offset is fetched from stream once and cached in partitionIdToSmallestOffset.
if (partitionIdToSmallestOffset == null) {
partitionIdToSmallestOffset = fetchPartitionGroupIdToSmallestOffset(streamConfigs, idealState);
partitionIdToSmallestOffset = fetchPartitionGroupIdToSmallestOffset(streamConfigs, idealState,
skipMissingTopics);
}

// Do not create new CONSUMING segment when the stream partition has reached end of life.
Expand Down Expand Up @@ -1858,7 +1869,7 @@ private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig str
}

private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(List<StreamConfig> streamConfigs,
IdealState idealState) {
IdealState idealState, boolean multitopicSkipMissingTopics) {
Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = new HashMap<>();
for (StreamConfig streamConfig : streamConfigs) {
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
Expand All @@ -1875,7 +1886,7 @@ private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOf
// The kafka implementation of computePartitionGroupMetadata() will ignore the current status
// while the kinesis implementation will use it.
List<PartitionGroupMetadata> partitionGroupMetadataList = getNewPartitionGroupMetadataList(
streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, true);
streamConfigs, currentPartitionGroupConsumptionStatusList, idealState, true, multitopicSkipMissingTopics);
streamConfig.setOffsetCriteria(originalOffsetCriteria);
for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public void testCommitSegment() {
// committing segment's partitionGroupId no longer in the newPartitionGroupMetadataList
List<PartitionGroupMetadata> partitionGroupMetadataListWithout0 =
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList(),
mock(IdealState.class));
mock(IdealState.class), false);
partitionGroupMetadataListWithout0.remove(0);
segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout0;

Expand Down Expand Up @@ -853,7 +853,7 @@ public void testRepairs() {
// 1 reached end of shard.
List<PartitionGroupMetadata> partitionGroupMetadataListWithout1 =
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList(),
mock(IdealState.class));
mock(IdealState.class), false);
partitionGroupMetadataListWithout1.remove(1);
segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout1;
// noop
Expand Down Expand Up @@ -1622,7 +1622,7 @@ public void testGetPartitionIds()
segmentManager._numPartitions = 2;

// Test empty ideal state
Set<Integer> partitionIds = segmentManager.getPartitionIds(streamConfigs, idealState);
Set<Integer> partitionIds = segmentManager.getPartitionIds(streamConfigs, idealState, false);
Assert.assertEquals(partitionIds.size(), 2);
partitionIds.clear();

Expand All @@ -1638,8 +1638,8 @@ public void testGetPartitionIds()
List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)),
new PartitionGroupMetadata(1, new LongMsgOffset(345)));
doReturn(partitionGroupMetadataList).when(segmentManagerSpy)
.getNewPartitionGroupMetadataList(streamConfigs, partitionGroupConsumptionStatusList, idealState);
partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, idealState);
.getNewPartitionGroupMetadataList(streamConfigs, partitionGroupConsumptionStatusList, idealState, false);
partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, idealState, false);
Assert.assertEquals(partitionIds.size(), 2);
}

Expand Down Expand Up @@ -2024,7 +2024,8 @@ public void setUpNewTable() {

public void ensureAllPartitionsConsuming() {
ensureAllPartitionsConsuming(_tableConfig, _streamConfigs, _idealState,
getNewPartitionGroupMetadataList(_streamConfigs, Collections.emptyList(), mock(IdealState.class)), null);
getNewPartitionGroupMetadataList(_streamConfigs, Collections.emptyList(), mock(IdealState.class), false),
null);
}

@Override
Expand Down Expand Up @@ -2106,7 +2107,8 @@ Set<Integer> getPartitionIds(StreamConfig streamConfig) {

@Override
List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList, IdealState idealState) {
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList, IdealState idealState,
boolean multitopicSkipMissingTopics) {
if (_partitionGroupMetadataList != null) {
return _partitionGroupMetadataList;
} else {
Expand All @@ -2118,8 +2120,9 @@ List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(List<StreamConfig>
@Override
List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList, IdealState idealState,
boolean forceGetOffsetFromStream) {
return getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState);
boolean forceGetOffsetFromStream, boolean multitopicSkipMissingTopics) {
return getNewPartitionGroupMetadataList(streamConfigs, currentPartitionGroupConsumptionStatusList, idealState,
multitopicSkipMissingTopics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public class StreamIngestionConfig extends BaseJsonConfig {
@JsonPropertyDescription("Class to handle realtime offset auto reset")
private String _realtimeOffsetAutoResetHandlerClass;

@JsonPropertyDescription("Multitopic Tables: If true, non-existent topics will be skipped instead of "
+ "causing failures.")
private boolean _multitopicSkipMissingTables;

@JsonCreator
public StreamIngestionConfig(@JsonProperty("streamConfigMaps") List<Map<String, String>> streamConfigMaps) {
_streamConfigMaps = streamConfigMaps;
Expand Down Expand Up @@ -136,4 +140,12 @@ public String getRealtimeOffsetAutoResetHandlerClass() {
public void setRealtimeOffsetAutoResetHandlerClass(String realtimeOffsetAutoResetHandlerClass) {
_realtimeOffsetAutoResetHandlerClass = realtimeOffsetAutoResetHandlerClass;
}

public boolean isMultitopicSkipMissingTables() {
return _multitopicSkipMissingTables;
}

public void setMultitopicSkipMissingTables(boolean multitopicSkipMissingTables) {
_multitopicSkipMissingTables = multitopicSkipMissingTables;
}
}
Loading