From 23ce8e8c38df295aff05050cb23059dd9c446ef6 Mon Sep 17 00:00:00 2001 From: Eduwer Camacaro Date: Wed, 22 Oct 2025 10:54:41 -0500 Subject: [PATCH] refactor startup state stores initialization --- .../KafkaStreamsTelemetryIntegrationTest.java | 9 +- .../apache/kafka/streams/KafkaStreams.java | 2 +- .../kafka/streams/processor/StateStore.java | 2 + .../internals/ProcessorStateManager.java | 46 +++- .../internals/StandbyTaskCreator.java | 29 +++ .../processor/internals/StateDirectory.java | 210 +++++++++++++----- .../processor/internals/TaskManager.java | 22 +- .../streams/state/internals/RocksDBStore.java | 20 +- .../state/internals/WrappedStateStore.java | 5 + .../kafka/streams/KafkaStreamsTest.java | 4 +- .../internals/StateDirectoryTest.java | 17 +- .../processor/internals/TaskManagerTest.java | 9 +- .../state/internals/RocksDBStoreTest.java | 10 +- .../apache/kafka/test/MockKeyValueStore.java | 5 + 14 files changed, 301 insertions(+), 89 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index b479446389a9a..8594ea68c95d3 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -281,6 +281,11 @@ public void shouldPassMetrics(final String topologyType, final String groupProto streamsApplicationProperties = props(groupProtocol); final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology(); + shouldPassMetrics(topology, FIRST_INSTANCE_CLIENT); + shouldPassMetrics(topology, SECOND_INSTANCE_CLIENT); + } + + private void shouldPassMetrics(final Topology topology, final int clientInstance) throws Exception { try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); @@ -292,8 +297,8 @@ public void shouldPassMetrics(final String topologyType, final String groupProto - final List consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics().stream().map(KafkaMetric::metricName).toList(); - final List adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList(); + final List consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(clientInstance).passedMetrics().stream().map(KafkaMetric::metricName).toList(); + final List adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(clientInstance).passedMetrics.stream().map(KafkaMetric::metricName).toList(); assertEquals(streamsThreadMetrics.size(), consumerPassedStreamThreadMetricNames.size()); diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 69012f0c3135a..c62e24aae5a54 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1376,7 +1376,7 @@ private static HostInfo parseHostInfo(final String endPoint) { public synchronized void start() throws IllegalStateException, StreamsException { if (setState(State.REBALANCING)) { log.debug("Initializing STANDBY tasks for existing local state"); - stateDirectory.initializeStartupTasks(topologyMetadata, streamsMetrics, logContext); + stateDirectory.initializeStartupTasks(topologyMetadata, logContext); log.debug("Starting Streams client"); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index 38a3e23e28a1e..079957eb2fef0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -71,6 +71,8 @@ public interface StateStore { */ void init(final StateStoreContext stateStoreContext, final StateStore root); + default void preInit(final StateStoreContext stateStoreContext) {}; + /** * Flush any cached data */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 3506845d288af..5aebffd97fe71 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -49,6 +49,7 @@ import java.util.Map; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static java.lang.String.format; @@ -176,6 +177,7 @@ public String toString() { // must be maintained in topological order private final FixedOrderMap stores = new FixedOrderMap<>(); + private final Map startupStores = new HashMap<>(); private final FixedOrderMap globalStores = new FixedOrderMap<>(); private final File baseDir; @@ -185,6 +187,7 @@ public String toString() { private TaskType taskType; private Logger log; private Task.State taskState; + private final AtomicBoolean startupState; public static String storeChangelogTopic(final String prefix, final String storeName, final String namedTopology) { if (namedTopology == null) { @@ -205,7 +208,8 @@ public ProcessorStateManager(final TaskId taskId, final ChangelogRegister changelogReader, final Map storeToChangelogTopic, final Collection sourcePartitions, - final boolean stateUpdaterEnabled) throws ProcessorStateException { + final boolean stateUpdaterEnabled, + final boolean startupState) throws ProcessorStateException { this.storeToChangelogTopic = storeToChangelogTopic; this.log = logContext.logger(ProcessorStateManager.class); this.logPrefix = logContext.logPrefix(); @@ -220,6 +224,22 @@ public ProcessorStateManager(final TaskId taskId, this.checkpointFile = new OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId)); log.debug("Created state store manager for task {}", taskId); + this.startupState = new AtomicBoolean(startupState); + } + + /** + * @throws ProcessorStateException if the task directory does not exist and could not be created + */ + public ProcessorStateManager(final TaskId taskId, + final TaskType taskType, + final boolean eosEnabled, + final LogContext logContext, + final StateDirectory stateDirectory, + final ChangelogRegister changelogReader, + final Map storeToChangelogTopic, + final Collection sourcePartitions, + final boolean stateUpdaterEnabled) throws ProcessorStateException { + this(taskId, taskType, eosEnabled, logContext, stateDirectory, changelogReader, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled, false); } /** @@ -234,7 +254,7 @@ static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId, final Map storeToChangelogTopic, final Set sourcePartitions, final boolean stateUpdaterEnabled) { - return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled); + return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled, true); } /** @@ -255,6 +275,10 @@ void assignToStreamThread(final LogContext logContext, this.sourcePartitions.addAll(sourcePartitions); } + void reuseState() { + startupState.set(false); + } + void registerStateStores(final List allStores, final InternalProcessorContext processorContext) { processorContext.uninitialize(); for (final StateStore store : allStores) { @@ -263,7 +287,13 @@ void registerStateStores(final List allStores, final InternalProcess maybeRegisterStoreWithChangelogReader(store.name()); } } else { - store.init(processorContext, store); + if (startupState.get()) { + store.preInit(processorContext); + startupStores.put(store.name(), store); + } else { + store.init(processorContext, store); + startupStores.remove(store.name()); + } } log.trace("Registered state store {}", store.name()); } @@ -649,9 +679,19 @@ else if (exception instanceof StreamsException) } } + stores.clear(); } + if (!startupStores.isEmpty()) { + for (final Map.Entry entry : startupStores.entrySet()) { + final StateStore store = entry.getValue(); + store.close(); + } + startupStores.clear(); + } + + if (firstException != null) { throw firstException; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index eb8bcafea695a..bf518e43468c8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -165,6 +165,35 @@ StandbyTask createStandbyTask(final TaskId taskId, return task; } + StandbyTask createStandbyTaskFromStartupLocalStore(final TaskId taskId, + final Set inputPartitions, + final ProcessorTopology topology, + final ProcessorStateManager stateManager) { + stateManager.reuseState(); + final InternalProcessorContext context = new ProcessorContextImpl( + taskId, + applicationConfig, + stateManager, + streamsMetrics, + dummyCache + ); + final StandbyTask task = new StandbyTask( + taskId, + inputPartitions, + topology, + topologyMetadata.taskConfig(taskId), + streamsMetrics, + stateManager, + stateDirectory, + dummyCache, + context + ); + + log.trace("Created standby task {} with assigned partitions {}", taskId, inputPartitions); + createTaskSensor.record(); + return task; + } + private LogContext getLogContext(final TaskId taskId) { final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()); final String logPrefix = threadIdPrefix + String.format("%s [%s] ", "standby-task", taskId); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index a95d20ddae0a1..14bb33ccf4704 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -17,16 +17,24 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.internals.StreamsConfigUtils; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.internals.ThreadCache; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @@ -47,6 +55,7 @@ import java.nio.file.StandardOpenOption; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -112,7 +121,7 @@ public StateDirectoryProcessFile() { private FileLock stateDirLock; private final StreamsConfig config; - private final ConcurrentMap tasksForLocalState = new ConcurrentHashMap<>(); + private final ConcurrentMap tasksForLocalState = new ConcurrentHashMap<>(); /** * Ensures that the state base directory as well as the application's sub-directory are created. @@ -198,11 +207,9 @@ private boolean lockStateDirectory() { } public void initializeStartupTasks(final TopologyMetadata topologyMetadata, - final StreamsMetricsImpl streamsMetrics, final LogContext logContext) { final List nonEmptyTaskDirectories = listNonEmptyTaskDirectories(); if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) { - final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics); final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config); final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals()); @@ -221,44 +228,23 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata, .map(t -> new TopicPartition(t, id.partition())) .collect(Collectors.toSet()); final ProcessorStateManager stateManager = ProcessorStateManager.createStartupTaskStateManager( - id, - eosEnabled, - logContext, - this, - subTopology.storeToChangelogTopic(), - inputPartitions, - stateUpdaterEnabled + id, + eosEnabled, + logContext, + this, + subTopology.storeToChangelogTopic(), + inputPartitions, + stateUpdaterEnabled ); - - final InternalProcessorContext context = new ProcessorContextImpl( - id, - config, - stateManager, - streamsMetrics, - dummyCache - ); - - final Task task = new StandbyTask( - id, - inputPartitions, - subTopology, - topologyMetadata.taskConfig(id), - streamsMetrics, - stateManager, - this, - dummyCache, - context - ); - - try { - task.initializeIfNeeded(); - - tasksForLocalState.put(id, task); - } catch (final TaskCorruptedException e) { - // Task is corrupt - wipe it out (under EOS) and don't initialize a Standby for it - task.suspend(); - task.closeDirty(); + final StartupContext initContext = new StartupContext(id, config, stateManager); + // TODO: we need to pass a proper logPrefix + StateManagerUtil.registerStateStores(log, "", subTopology, stateManager, this, initContext); + for (final StateStore stateStore : subTopology.stateStores()) { + if (!stateStore.isOpen()) { + throw new IllegalStateException("StateStore [" + stateStore.name() + "] is not open"); + } } + tasksForLocalState.put(id, new StartupState(id, subTopology, stateManager)); } } } @@ -268,8 +254,8 @@ public boolean hasStartupTasks() { return !tasksForLocalState.isEmpty(); } - public Task removeStartupTask(final TaskId taskId) { - final Task task = tasksForLocalState.remove(taskId); + public StartupState removeStartupTask(final TaskId taskId) { + final StartupState task = tasksForLocalState.remove(taskId); if (task != null) { lockedTasksToOwner.replace(taskId, Thread.currentThread()); } @@ -280,11 +266,11 @@ public void closeStartupTasks() { closeStartupTasks(t -> true); } - private void closeStartupTasks(final Predicate predicate) { + private void closeStartupTasks(final Predicate predicate) { if (!tasksForLocalState.isEmpty()) { // "drain" Tasks first to ensure that we don't try to close Tasks that another thread is attempting to close - final Set drainedTasks = new HashSet<>(tasksForLocalState.size()); - for (final Map.Entry entry : tasksForLocalState.entrySet()) { + final Set drainedTasks = new HashSet<>(tasksForLocalState.size()); + for (final Map.Entry entry : tasksForLocalState.entrySet()) { if (predicate.test(entry.getValue()) && removeStartupTask(entry.getKey()) != null) { // only add to our list of drained Tasks if we exclusively "claimed" a Task from tasksForLocalState // to ensure we don't accidentally try to drain the same Task multiple times from concurrent threads @@ -293,9 +279,8 @@ private void closeStartupTasks(final Predicate predicate) { } // now that we have exclusive ownership of the drained tasks, close them - for (final Task task : drainedTasks) { - task.suspend(); - task.closeClean(); + for (final StartupState localState : drainedTasks) { + localState.close(); } } } @@ -624,7 +609,7 @@ private IOException maybeCleanEmptyNamedTopologyDirs(final boolean logExceptionA ); if (namedTopologyDirs != null) { for (final File namedTopologyDir : namedTopologyDirs) { - closeStartupTasks(task -> task.id().topologyName().equals(parseNamedTopologyFromDirectory(namedTopologyDir.getName()))); + closeStartupTasks(localState -> localState.getTaskId().topologyName().equals(parseNamedTopologyFromDirectory(namedTopologyDir.getName()))); final File[] contents = namedTopologyDir.listFiles(); if (contents != null && contents.length == 0) { try { @@ -662,7 +647,7 @@ public void clearLocalStateForNamedTopology(final String topologyName) { log.debug("Tried to clear out the local state for NamedTopology {} but none was found", topologyName); } try { - closeStartupTasks(task -> task.id().topologyName().equals(topologyName)); + closeStartupTasks(localState -> localState.getTaskId().topologyName().equals(topologyName)); Utils.delete(namedTopologyDir); } catch (final IOException e) { log.error("Hit an unexpected error while clearing local state for topology " + topologyName, e); @@ -806,4 +791,129 @@ public int hashCode() { return Objects.hash(file, namedTopology); } } + + public class StartupState { + private final ProcessorTopology topology; + private final ProcessorStateManager stateMngr; + private final TaskId taskId; + + public StartupState(final TaskId taskId, final ProcessorTopology topology, final ProcessorStateManager stateMngr) { + this.topology = topology; + this.stateMngr = stateMngr; + this.taskId = taskId; + } + + public ProcessorStateManager getStateMngr() { + return stateMngr; + } + + public ProcessorTopology getTopology() { + return topology; + } + + public TaskId getTaskId() { + return taskId; + } + + public void close() { + if (lock(taskId)) { + try { + stateMngr.close(); + } finally { + unlock(taskId); + } + } + } + } + + private static class StartupContext extends AbstractProcessorContext { + + private final StateManager stateManager; + + public StartupContext(final TaskId taskId, final StreamsConfig config, final StateManager stateManager) { + super(taskId, config, null, null); + this.stateManager = stateManager; + } + + @Override + protected StateManager stateManager() { + return stateManager; + } + + @Override + public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void transitionToStandby(final ThreadCache newCache) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp, final Position position) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void forward(final K key, final V value) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void forward(final K key, final V value, final To to) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void commit() { + throw new IllegalStateException("Should not be called"); + } + + @Override + public long currentStreamTimeMs() { + throw new IllegalStateException("Should not be called"); + } + + @Override + public StreamsMetricsImpl metrics() { + throw new IllegalStateException("Should not be called"); + } + + @Override + public S getStateStore(final String name) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) { + throw new IllegalStateException("Should not be called"); + } + + + @Override + public void forward(final FixedKeyRecord record) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void forward(final FixedKeyRecord record, final String childName) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void forward(final Record record) { + throw new IllegalStateException("Should not be called"); + } + + @Override + public void forward(final Record record, final String childName) { + throw new IllegalStateException("Should not be called"); + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 67d009b037f78..5ae69bc2d5376 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -333,25 +333,19 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina } } - private Map> assignStartupTasks(final Map> tasksToAssign, - final String threadLogPrefix, - final TopologyMetadata topologyMetadata, - final ChangelogRegister changelogReader) { + private Map> assignStartupTasks(final Map> tasksToAssign) { if (stateDirectory.hasStartupTasks()) { final Map> assignedTasks = new HashMap<>(tasksToAssign.size()); for (final Map.Entry> entry : tasksToAssign.entrySet()) { final TaskId taskId = entry.getKey(); - final Task task = stateDirectory.removeStartupTask(taskId); - if (task != null) { + final StateDirectory.StartupState localState = stateDirectory.removeStartupTask(taskId); + if (localState != null) { // replace our dummy values with the real ones, now we know our thread and assignment final Set inputPartitions = entry.getValue(); - task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions); - updateInputPartitionsOfStandbyTaskIfTheyChanged(task, inputPartitions); - + final Task task = standbyTaskCreator.createStandbyTaskFromStartupLocalStore(taskId, inputPartitions, localState.getTopology(), localState.getStateMngr()); assignedTasks.put(task, inputPartitions); } } - return assignedTasks; } else { return Collections.emptyMap(); @@ -487,8 +481,8 @@ private void handleTasksWithoutStateUpdater(final Map> standbyTasksToCreate, final Map> tasksToRecycle, final Set tasksToCloseClean) { - final Map> startupStandbyTasksToRecycle = assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, changelogReader); - final Map> startupStandbyTasksToUse = assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, changelogReader); + final Map> startupStandbyTasksToRecycle = assignStartupTasks(activeTasksToCreate); + final Map> startupStandbyTasksToUse = assignStartupTasks(standbyTasksToCreate); // recycle the startup standbys to active tasks.addStandbyTasks(startupStandbyTasksToRecycle.keySet()); @@ -571,8 +565,8 @@ private void handleTasksPendingInitialization() { private void handleStartupTaskReuse(final Map> activeTasksToCreate, final Map> standbyTasksToCreate, final Map failedTasks) { - final Map> startupStandbyTasksToRecycle = assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, changelogReader); - final Map> startupStandbyTasksToUse = assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, changelogReader); + final Map> startupStandbyTasksToRecycle = assignStartupTasks(activeTasksToCreate); + final Map> startupStandbyTasksToUse = assignStartupTasks(standbyTasksToCreate); // recycle the startup standbys to active, and remove them from the set of actives that need to be created if (!startupStandbyTasksToRecycle.isEmpty()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index ede618237cf35..55492fe301488 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -80,6 +80,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; @@ -132,6 +133,7 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS protected StateStoreContext context; protected Position position; private OffsetCheckpoint positionCheckpoint; + private final AtomicBoolean initialized = new AtomicBoolean(false); public RocksDBStore(final String name, final String metricsScope) { @@ -157,9 +159,14 @@ public RocksDBStore(final String name, @Override public void init(final StateStoreContext stateStoreContext, final StateStore root) { + initialized.set(true); // open the DB dir metricsRecorder.init(metricsImpl(stateStoreContext), stateStoreContext.taskId()); - openDB(stateStoreContext.appConfigs(), stateStoreContext.stateDir()); + if (!open) { + preInit(stateStoreContext); + } + + addValueProvidersToMetricsRecorder(); final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position"); this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile); @@ -179,6 +186,11 @@ public void init(final StateStoreContext stateStoreContext, false); } + @Override + public void preInit(final StateStoreContext stateStoreContext) { + openDB(stateStoreContext.appConfigs(), stateStoreContext.stateDir()); + } + @SuppressWarnings("unchecked") void openDB(final Map configs, final File stateDir) { // initialize the default rocksdb options @@ -242,7 +254,6 @@ void openDB(final Map configs, final File stateDir) { dbAccessor = new DirectDBAccessor(db, fOptions, wOptions); open = true; - addValueProvidersToMetricsRecorder(); } private void setupStatistics(final Map configs, final DBOptions dbOptions) { @@ -677,8 +688,9 @@ public synchronized void close() { configSetter.close(name, userSpecifiedOptions); configSetter = null; } - - metricsRecorder.removeValueProviders(name); + if (initialized.get()) { + metricsRecorder.removeValueProviders(name); + } // Important: do not rearrange the order in which the below objects are closed! // Order of closing must follow: ColumnFamilyHandle > RocksDB > DBOptions > ColumnFamilyOptions diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index adbb7568c87c5..a2bcc50d1ea39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -58,6 +58,11 @@ public WrappedStateStore(final S wrapped) { this.wrapped = wrapped; } + @Override + public void preInit(final StateStoreContext stateStoreContext) { + wrapped.preInit(stateStoreContext); + } + @Override public void init(final StateStoreContext stateStoreContext, final StateStore root) { wrapped.init(stateStoreContext, root); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 14d4cf8c21fb3..2068a5ff9a725 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -416,9 +416,9 @@ public void shouldInitializeTasksForLocalStateOnStart() { try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { assertEquals(1, constructed.constructed().size()); final StateDirectory stateDirectory = constructed.constructed().get(0); - verify(stateDirectory, times(0)).initializeStartupTasks(any(), any(), any()); + verify(stateDirectory, times(0)).initializeStartupTasks(any(), any()); streams.start(); - verify(stateDirectory, times(1)).initializeStartupTasks(any(), any(), any()); + verify(stateDirectory, times(1)).initializeStartupTasks(any(), any()); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index 3b795310b3615..ec11015b1f50f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -28,7 +27,6 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.TestUtils; @@ -844,15 +842,15 @@ public void shouldInitializeStandbyTasksForLocalState() { public void shouldNotAssignStartupTasksWeDontHave() { final TaskId taskId = new TaskId(0, 0); initializeStartupTasks(taskId, false); - final Task task = directory.removeStartupTask(taskId); + final StateDirectory.StartupState task = directory.removeStartupTask(taskId); assertNull(task); } private class FakeStreamThread extends Thread { private final TaskId taskId; - private final AtomicReference result; + private final AtomicReference result; - private FakeStreamThread(final TaskId taskId, final AtomicReference result) { + private FakeStreamThread(final TaskId taskId, final AtomicReference result) { this.taskId = taskId; this.result = result; } @@ -873,14 +871,13 @@ public void shouldAssignStartupTaskToStreamThread() throws InterruptedException assertThat(directory.lockOwner(taskId), is(Thread.currentThread())); // spawn off a "fake" StreamThread, so we can verify the lock was updated to the correct thread - final AtomicReference result = new AtomicReference<>(); + final AtomicReference result = new AtomicReference<>(); final Thread streamThread = new FakeStreamThread(taskId, result); streamThread.start(); streamThread.join(); - final Task task = result.get(); + final StateDirectory.StartupState localState = result.get(); - assertNotNull(task); - assertThat(task, instanceOf(StandbyTask.class)); + assertNotNull(localState); // verify the owner of the task directory lock has been shifted over to our assigned StreamThread assertThat(directory.lockOwner(taskId), is(instanceOf(FakeStreamThread.class))); @@ -954,7 +951,7 @@ private StateStore initializeStartupTasks(final TaskId taskId, final boolean cre Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology); Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig()); - directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", "processId", time), new LogContext("test")); + directory.initializeStartupTasks(metadata, new LogContext("test")); return store; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 63cbc441f8a82..458409e6d6598 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -4669,14 +4669,16 @@ public void shouldListNotPausedTasks() { public void shouldRecycleStartupTasksFromStateDirectoryAsActive() { final Tasks taskRegistry = new Tasks(new LogContext()); final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry); + final StateDirectory.StartupState startupState = mock(StateDirectory.StartupState.class); final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build(); final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions).build(); when(activeTaskCreator.createActiveTaskFromStandby(eq(startupTask), eq(taskId00Partitions), any())) .thenReturn(activeTask); + when(standbyTaskCreator.createStandbyTaskFromStartupLocalStore(eq(taskId00), eq(taskId00Partitions), any(), any())).thenReturn(startupTask); when(stateDirectory.hasStartupTasks()).thenReturn(true, false); - when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) null); + when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupState, (StateDirectory.StartupState) null); taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap()); @@ -4707,10 +4709,13 @@ public void shouldRecycleStartupTasksFromStateDirectoryAsActive() { public void shouldUseStartupTasksFromStateDirectoryAsStandby() { final Tasks taskRegistry = new Tasks(new LogContext()); final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry); + final StateDirectory.StartupState startupState = mock(StateDirectory.StartupState.class); final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build(); + when(standbyTaskCreator.createStandbyTaskFromStartupLocalStore(eq(taskId00), eq(taskId00Partitions), any(), any())).thenReturn(startupTask); + when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false); - when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) null); + when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupState, (StateDirectory.StartupState) null); assertFalse(taskRegistry.hasPendingTasksToInit()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 70224c8013c97..b647bd70dd333 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -208,6 +208,7 @@ public void shouldAddValueProvidersWithoutStatisticsToInjectedMetricsRecorderWhe context = getProcessorContext(RecordingLevel.INFO); rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + rocksDBStore.init(context, rocksDBStore); verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), notNull(), isNull()); } @@ -218,6 +219,7 @@ public void shouldAddValueProvidersWithStatisticsToInjectedMetricsRecorderWhenRe context = getProcessorContext(RecordingLevel.DEBUG); rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + rocksDBStore.init(context, rocksDBStore); verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), notNull(), notNull()); } @@ -227,7 +229,9 @@ public void shouldRemoveValueProvidersFromInjectedMetricsRecorderOnClose() { rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder(); try { context = getProcessorContext(RecordingLevel.DEBUG); + rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + rocksDBStore.init(context, rocksDBStore); } finally { rocksDBStore.close(); } @@ -259,6 +263,7 @@ public void shouldNotSetStatisticsInValueProvidersWhenUserProvidesStatistics() { context = getProcessorContext(RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class); rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + rocksDBStore.init(context, rocksDBStore); verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), notNull(), isNull()); } @@ -287,6 +292,7 @@ public void shouldSetStatisticsInValueProvidersWhenUserProvidesNoStatistics() th context = getProcessorContext(RecordingLevel.DEBUG); rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + rocksDBStore.init(context, rocksDBStore); verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), notNull(), eq(getStatistics(rocksDBStore))); } @@ -325,9 +331,10 @@ public void shouldThrowWhenUserProvidesNewBlockBasedTableFormatConfig() { RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig.class ); + rocksDBStore.openDB(context.appConfigs(), context.stateDir()); assertThrows( ProcessorStateException.class, - () -> rocksDBStore.openDB(context.appConfigs(), context.stateDir()), + () -> rocksDBStore.init(context, rocksDBStore), "The used block-based table format configuration does not expose the " + "block cache. Use the BlockBasedTableConfig instance provided by Options#tableFormatConfig() to configure " + "the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to " + @@ -356,6 +363,7 @@ public void shouldNotSetCacheInValueProvidersWhenUserProvidesPlainTableFormatCon ); rocksDBStore.openDB(context.appConfigs(), context.stateDir()); + rocksDBStore.init(context, rocksDBStore); verify(metricsRecorder).addValueProviders(eq(DB_NAME), notNull(), isNull(), notNull()); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java index a2a6ac43dfd76..5106ea57454cd 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java @@ -56,6 +56,11 @@ public String name() { return name; } + @Override + public void preInit(StateStoreContext stateStoreContext) { + closed = false; + } + @Override public void init(final StateStoreContext stateStoreContext, final StateStore root) {