-
Notifications
You must be signed in to change notification settings - Fork 13.6k
/
InternalTopicManager.java
138 lines (122 loc) · 6.61 KB
/
InternalTopicManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.streams.errors.StreamsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class InternalTopicManager {
private static final Logger log = LoggerFactory.getLogger(InternalTopicManager.class);
public static final String CLEANUP_POLICY_PROP = "cleanup.policy";
public static final String RETENTION_MS = "retention.ms";
public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
private static final int MAX_TOPIC_READY_TRY = 5;
private final long windowChangeLogAdditionalRetention;
private final int replicationFactor;
private final StreamsKafkaClient streamsKafkaClient;
public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor, final long windowChangeLogAdditionalRetention) {
this.streamsKafkaClient = streamsKafkaClient;
this.replicationFactor = replicationFactor;
this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention;
}
/**
* Prepares a set of given internal topics.
*
* If a topic does not exist creates a new topic.
* If a topic with the correct number of partitions exists ignores it.
* If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again.
*/
public void makeReady(final Map<InternalTopicConfig, Integer> topics) {
int actualReplicationFactor = replicationFactor;
for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
try {
final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
final Map<InternalTopicConfig, Integer> topicsToBeCreated = validateTopicPartitions(topics, existingTopicPartitions);
if (metadata.brokers().size() > 0 && metadata.brokers().size() < replicationFactor) {
log.warn("The number of available brokers {} is less than the desired replication " +
"factor for streams internal topics {}. If running in production, consider " +
"increasing the number of available brokers.",
metadata.brokers().size(), replicationFactor);
actualReplicationFactor = metadata.brokers().size();
}
streamsKafkaClient.createTopics(topicsToBeCreated, actualReplicationFactor, windowChangeLogAdditionalRetention, metadata);
return;
} catch (StreamsException ex) {
log.warn("Could not create internal topics: " + ex.getMessage() + " Retry #" + i);
}
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
// ignore
}
}
throw new StreamsException("Could not create internal topics.");
}
/**
* Get the number of partitions for the given topics
*/
public Map<String, Integer> getNumPartitions(final Set<String> topics) {
final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
existingTopicPartitions.keySet().retainAll(topics);
return existingTopicPartitions;
}
public void close() {
try {
streamsKafkaClient.close();
} catch (IOException e) {
log.warn("Could not close StreamsKafkaClient.");
}
}
/**
* Check the existing topics to have correct number of partitions; and return the non existing topics to be created
*/
private Map<InternalTopicConfig, Integer> validateTopicPartitions(final Map<InternalTopicConfig, Integer> topicsPartitionsMap,
final Map<String, Integer> existingTopicNamesPartitions) {
final Map<InternalTopicConfig, Integer> topicsToBeCreated = new HashMap<>();
for (Map.Entry<InternalTopicConfig, Integer> entry : topicsPartitionsMap.entrySet()) {
InternalTopicConfig topic = entry.getKey();
Integer partition = entry.getValue();
if (existingTopicNamesPartitions.containsKey(topic.name())) {
if (!existingTopicNamesPartitions.get(topic.name()).equals(partition)) {
throw new StreamsException("Existing internal topic " + topic.name() + " has invalid partitions." +
" Expected: " + partition + " Actual: " + existingTopicNamesPartitions.get(topic.name()) +
". Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.");
}
} else {
topicsToBeCreated.put(topic, partition);
}
}
return topicsToBeCreated;
}
private Map<String, Integer> fetchExistingPartitionCountByTopic(final MetadataResponse metadata) {
// The names of existing topics and corresponding partition counts
final Map<String, Integer> existingPartitionCountByTopic = new HashMap<>();
final Collection<MetadataResponse.TopicMetadata> topicsMetadata = metadata.topicMetadata();
for (MetadataResponse.TopicMetadata topicMetadata: topicsMetadata) {
existingPartitionCountByTopic.put(topicMetadata.topic(), topicMetadata.partitionMetadata().size());
}
return existingPartitionCountByTopic;
}
}