Skip to content

Commit 571f508

Browse files
authored
KAFKA-17411: Create local state Standbys on start (#16922)
Instead of waiting until Tasks are assigned to us, we pre-emptively create a StandbyTask for each non-empty Task directory found on-disk. We do this before starting any StreamThreads, and on our first assignment (after joining the consumer group), we recycle any of these StandbyTasks that were assigned to us, either as an Active or a Standby. We can't just use these "initial Standbys" as-is, because they were constructed outside the context of a StreamThread, so we first have to update them with the context (log context, ChangelogReader, and source topics) of the thread that it has been assigned to. The motivation for this is to (in a later commit) read StateStore offsets for unowned Tasks from the StateStore itself, rather than the .checkpoint file, which we plan to deprecate and remove. There are a few additional benefits: Initializing these Tasks on start-up, instead of on-assignment, will reduce the time between a member joining the consumer group and beginning processing. This is especially important when active tasks are being moved over, for example, as part of a rolling restart. If a Task has corrupt data on-disk, it will be discovered on startup and wiped under EOS. This is preferable to wiping the state after being assigned the Task, because another instance may have non-corrupt data and would not need to restore (as much). There is a potential performance impact: we open all on-disk Task StateStores, and keep them all open until we have our first assignment. This could require large amounts of memory, in particular when there are a large number of local state stores on-disk. However, since old local state for Tasks we don't own is automatically cleaned up after a period of time, in practice, we will almost always only be dealing with the state that was last assigned to the local instance. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>, Matthias Sax <mjsax@apache.org>
1 parent 7366f04 commit 571f508

File tree

9 files changed

+551
-17
lines changed

9 files changed

+551
-17
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@
188188

189189
<!-- Streams -->
190190
<suppress checks="ClassFanOutComplexity"
191-
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/>
191+
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask|TaskManager).java"/>
192192

193193
<suppress checks="MethodLength"
194194
files="KTableImpl.java"/>

streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ public class KafkaStreams implements AutoCloseable {
181181
protected final TopologyMetadata topologyMetadata;
182182
private final QueryableStoreProvider queryableStoreProvider;
183183
private final DelegatingStandbyUpdateListener delegatingStandbyUpdateListener;
184+
private final LogContext logContext;
184185

185186
GlobalStreamThread globalStreamThread;
186187
protected StateDirectory stateDirectory = null;
@@ -677,6 +678,9 @@ private void maybeSetRunning() {
677678
return;
678679
}
679680

681+
// all (alive) threads have received their assignment, close any remaining startup tasks, they're not needed
682+
stateDirectory.closeStartupTasks();
683+
680684
setState(State.RUNNING);
681685
}
682686

@@ -999,7 +1003,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
9991003
} else {
10001004
clientId = userClientId;
10011005
}
1002-
final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId));
1006+
logContext = new LogContext(String.format("stream-client [%s] ", clientId));
10031007
this.log = logContext.logger(getClass());
10041008
topologyMetadata.setLog(logContext);
10051009

@@ -1411,6 +1415,9 @@ private static HostInfo parseHostInfo(final String endPoint) {
14111415
*/
14121416
public synchronized void start() throws IllegalStateException, StreamsException {
14131417
if (setState(State.REBALANCING)) {
1418+
log.debug("Initializing STANDBY tasks for existing local state");
1419+
stateDirectory.initializeStartupTasks(topologyMetadata, streamsMetrics, logContext);
1420+
14141421
log.debug("Starting Streams client");
14151422

14161423
if (globalStreamThread != null) {

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.Collection;
4545
import java.util.Collections;
4646
import java.util.HashMap;
47+
import java.util.HashSet;
4748
import java.util.LinkedList;
4849
import java.util.List;
4950
import java.util.Map;
@@ -166,11 +167,11 @@ public String toString() {
166167

167168
private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
168169

169-
private final String logPrefix;
170+
private String logPrefix;
170171

171172
private final TaskId taskId;
172173
private final boolean eosEnabled;
173-
private final ChangelogRegister changelogReader;
174+
private ChangelogRegister changelogReader;
174175
private final Collection<TopicPartition> sourcePartitions;
175176
private final Map<String, String> storeToChangelogTopic;
176177

@@ -222,6 +223,38 @@ public ProcessorStateManager(final TaskId taskId,
222223
log.debug("Created state store manager for task {}", taskId);
223224
}
224225

226+
/**
227+
* Special constructor used by {@link StateDirectory} to partially initialize startup tasks for local state, before
228+
* they're assigned to a thread. When the task is assigned to a thread, the initialization of this StateManager is
229+
* completed in {@link #assignToStreamThread(LogContext, ChangelogRegister, Collection)}.
230+
*/
231+
static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId,
232+
final boolean eosEnabled,
233+
final LogContext logContext,
234+
final StateDirectory stateDirectory,
235+
final Map<String, String> storeToChangelogTopic,
236+
final boolean stateUpdaterEnabled) {
237+
return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, new HashSet<>(0), stateUpdaterEnabled);
238+
}
239+
240+
/**
241+
* Standby tasks initialized for local state on-startup are only partially initialized, because they are not yet
242+
* assigned to a StreamThread. Once assigned to a StreamThread, we complete their initialization here using the
243+
* assigned StreamThread's context.
244+
*/
245+
void assignToStreamThread(final LogContext logContext,
246+
final ChangelogRegister changelogReader,
247+
final Collection<TopicPartition> sourcePartitions) {
248+
if (this.changelogReader != null) {
249+
throw new IllegalStateException("Attempted to replace an existing changelogReader on a StateManager without closing it.");
250+
}
251+
this.sourcePartitions.clear();
252+
this.log = logContext.logger(ProcessorStateManager.class);
253+
this.logPrefix = logContext.logPrefix();
254+
this.changelogReader = changelogReader;
255+
this.sourcePartitions.addAll(sourcePartitions);
256+
}
257+
225258
void registerStateStores(final List<StateStore> allStores, final InternalProcessorContext processorContext) {
226259
processorContext.uninitialize();
227260
for (final StateStore store : allStores) {
@@ -314,7 +347,7 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
314347
}
315348

316349
private void maybeRegisterStoreWithChangelogReader(final String storeName) {
317-
if (isLoggingEnabled(storeName)) {
350+
if (isLoggingEnabled(storeName) && changelogReader != null) {
318351
changelogReader.register(getStorePartition(storeName), this);
319352
}
320353
}
@@ -569,7 +602,7 @@ public void flushCache() {
569602
public void close() throws ProcessorStateException {
570603
log.debug("Closing its state manager and all the registered state stores: {}", stores);
571604

572-
if (!stateUpdaterEnabled) {
605+
if (!stateUpdaterEnabled && changelogReader != null) {
573606
changelogReader.unregister(getAllChangelogTopicPartitions());
574607
}
575608

@@ -610,7 +643,7 @@ public void close() throws ProcessorStateException {
610643
void recycle() {
611644
log.debug("Recycling state for {} task {}.", taskType, taskId);
612645

613-
if (!stateUpdaterEnabled) {
646+
if (!stateUpdaterEnabled && changelogReader != null) {
614647
final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
615648
changelogReader.unregister(allChangelogs);
616649
}

streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@
1616
*/
1717
package org.apache.kafka.streams.processor.internals;
1818

19+
import org.apache.kafka.common.utils.LogContext;
1920
import org.apache.kafka.common.utils.Time;
2021
import org.apache.kafka.common.utils.Utils;
2122
import org.apache.kafka.streams.StreamsConfig;
2223
import org.apache.kafka.streams.errors.ProcessorStateException;
2324
import org.apache.kafka.streams.errors.StreamsException;
25+
import org.apache.kafka.streams.errors.TaskCorruptedException;
26+
import org.apache.kafka.streams.internals.StreamsConfigUtils;
2427
import org.apache.kafka.streams.processor.TaskId;
28+
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
29+
import org.apache.kafka.streams.state.internals.ThreadCache;
2530

2631
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
2732
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -45,11 +50,16 @@
4550
import java.util.Arrays;
4651
import java.util.Collections;
4752
import java.util.HashMap;
53+
import java.util.HashSet;
4854
import java.util.List;
55+
import java.util.Map;
4956
import java.util.Objects;
5057
import java.util.Set;
5158
import java.util.UUID;
59+
import java.util.concurrent.ConcurrentHashMap;
60+
import java.util.concurrent.ConcurrentMap;
5261
import java.util.concurrent.atomic.AtomicReference;
62+
import java.util.function.Predicate;
5363
import java.util.regex.Pattern;
5464
import java.util.stream.Collectors;
5565

@@ -100,6 +110,9 @@ public StateDirectoryProcessFile() {
100110
private FileChannel stateDirLockChannel;
101111
private FileLock stateDirLock;
102112

113+
private final StreamsConfig config;
114+
private final ConcurrentMap<TaskId, Task> tasksForLocalState = new ConcurrentHashMap<>();
115+
103116
/**
104117
* Ensures that the state base directory as well as the application's sub-directory are created.
105118
*
@@ -118,6 +131,7 @@ public StateDirectory(final StreamsConfig config, final Time time, final boolean
118131
this.hasPersistentStores = hasPersistentStores;
119132
this.hasNamedTopologies = hasNamedTopologies;
120133
this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
134+
this.config = config;
121135
final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG);
122136
final File baseDir = new File(stateDirName);
123137
stateDir = new File(baseDir, appId);
@@ -182,6 +196,104 @@ private boolean lockStateDirectory() {
182196
return stateDirLock != null;
183197
}
184198

199+
public void initializeStartupTasks(final TopologyMetadata topologyMetadata,
200+
final StreamsMetricsImpl streamsMetrics,
201+
final LogContext logContext) {
202+
final List<TaskDirectory> nonEmptyTaskDirectories = listNonEmptyTaskDirectories();
203+
if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
204+
final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics);
205+
final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
206+
final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
207+
208+
// discover all non-empty task directories in StateDirectory
209+
for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
210+
final String dirName = taskDirectory.file().getName();
211+
final TaskId id = parseTaskDirectoryName(dirName, taskDirectory.namedTopology());
212+
final ProcessorTopology subTopology = topologyMetadata.buildSubtopology(id);
213+
214+
// we still check if the task's sub-topology is stateful, even though we know its directory contains state,
215+
// because it's possible that the topology has changed since that data was written, and is now stateless
216+
// this therefore prevents us from creating unnecessary Tasks just because of some left-over state
217+
if (subTopology.hasStateWithChangelogs()) {
218+
final ProcessorStateManager stateManager = ProcessorStateManager.createStartupTaskStateManager(
219+
id,
220+
eosEnabled,
221+
logContext,
222+
this,
223+
subTopology.storeToChangelogTopic(),
224+
stateUpdaterEnabled
225+
);
226+
227+
final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl(
228+
id,
229+
config,
230+
stateManager,
231+
streamsMetrics,
232+
dummyCache
233+
);
234+
235+
final Task task = new StandbyTask(
236+
id,
237+
new HashSet<>(),
238+
subTopology,
239+
topologyMetadata.taskConfig(id),
240+
streamsMetrics,
241+
stateManager,
242+
this,
243+
dummyCache,
244+
context
245+
);
246+
247+
try {
248+
task.initializeIfNeeded();
249+
250+
tasksForLocalState.put(id, task);
251+
} catch (final TaskCorruptedException e) {
252+
// Task is corrupt - wipe it out (under EOS) and don't initialize a Standby for it
253+
task.suspend();
254+
task.closeDirty();
255+
}
256+
}
257+
}
258+
}
259+
}
260+
261+
public boolean hasStartupTasks() {
262+
return !tasksForLocalState.isEmpty();
263+
}
264+
265+
public Task removeStartupTask(final TaskId taskId) {
266+
final Task task = tasksForLocalState.remove(taskId);
267+
if (task != null) {
268+
lockedTasksToOwner.replace(taskId, Thread.currentThread());
269+
}
270+
return task;
271+
}
272+
273+
public void closeStartupTasks() {
274+
closeStartupTasks(t -> true);
275+
}
276+
277+
private void closeStartupTasks(final Predicate<Task> predicate) {
278+
if (!tasksForLocalState.isEmpty()) {
279+
// "drain" Tasks first to ensure that we don't try to close Tasks that another thread is attempting to close
280+
final Set<Task> drainedTasks = new HashSet<>(tasksForLocalState.size());
281+
for (final Map.Entry<TaskId, Task> entry : tasksForLocalState.entrySet()) {
282+
if (predicate.test(entry.getValue()) && tasksForLocalState.remove(entry.getKey()) != null) {
283+
// only add to our list of drained Tasks if we exclusively "claimed" a Task from tasksForLocalState
284+
// to ensure we don't accidentally try to drain the same Task multiple times from concurrent threads
285+
drainedTasks.add(entry.getValue());
286+
}
287+
}
288+
289+
// now that we have exclusive ownership of the drained tasks, close them
290+
for (final Task task : drainedTasks) {
291+
task.suspend();
292+
task.closeClean();
293+
}
294+
}
295+
}
296+
185297
public UUID initializeProcessId() {
186298
if (!hasPersistentStores) {
187299
final UUID processId = UUID.randomUUID();
@@ -379,9 +491,17 @@ synchronized void unlock(final TaskId taskId) {
379491
}
380492
}
381493

494+
/**
495+
* Expose for tests.
496+
*/
497+
Thread lockOwner(final TaskId taskId) {
498+
return lockedTasksToOwner.get(taskId);
499+
}
500+
382501
@Override
383502
public void close() {
384503
if (hasPersistentStores) {
504+
closeStartupTasks();
385505
try {
386506
stateDirLock.release();
387507
stateDirLockChannel.close();
@@ -499,6 +619,7 @@ private IOException maybeCleanEmptyNamedTopologyDirs(final boolean logExceptionA
499619
);
500620
if (namedTopologyDirs != null) {
501621
for (final File namedTopologyDir : namedTopologyDirs) {
622+
closeStartupTasks(task -> task.id().topologyName().equals(parseNamedTopologyFromDirectory(namedTopologyDir.getName())));
502623
final File[] contents = namedTopologyDir.listFiles();
503624
if (contents != null && contents.length == 0) {
504625
try {
@@ -536,6 +657,7 @@ public void clearLocalStateForNamedTopology(final String topologyName) {
536657
log.debug("Tried to clear out the local state for NamedTopology {} but none was found", topologyName);
537658
}
538659
try {
660+
closeStartupTasks(task -> task.id().topologyName().equals(topologyName));
539661
Utils.delete(namedTopologyDir);
540662
} catch (final IOException e) {
541663
log.error("Hit an unexpected error while clearing local state for topology " + topologyName, e);

0 commit comments

Comments
 (0)