Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5998: fix checkpointableOffsets handling #7030

Merged
merged 3 commits into from Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.FixedOrderMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
Expand All @@ -32,6 +33,7 @@
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.slf4j.Logger;

Expand All @@ -48,30 +50,41 @@
import java.util.Optional;
import java.util.Set;

import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;

/**
* This class is responsible for the initialization, restoration, closing, flushing etc
* of Global State Stores. There is only ever 1 instance of this class per Application Instance.
*/
public class GlobalStateManagerImpl extends AbstractStateManager implements GlobalStateManager {
public class GlobalStateManagerImpl implements GlobalStateManager {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop sharing mutable state between a superclass and subclass. The only reason to do it was to support the re-initialization logic, but the checkpoint map can just as easily be passed in as a parameter.

private final Logger log;
private final boolean eosEnabled;
private final ProcessorTopology topology;
private final Consumer<byte[], byte[]> globalConsumer;
private final File baseDir;
private final StateDirectory stateDirectory;
private final Set<String> globalStoreNames = new HashSet<>();
private final FixedOrderMap<String, Optional<StateStore>> globalStores = new FixedOrderMap<>();
private final StateRestoreListener stateRestoreListener;
private InternalProcessorContext processorContext;
private InternalProcessorContext globalProcessorContext;
private final int retries;
private final long retryBackoffMs;
private final Duration pollTime;
private final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
private final OffsetCheckpoint checkpointFile;
private final Map<TopicPartition, Long> checkpointFileCache;

public GlobalStateManagerImpl(final LogContext logContext,
final ProcessorTopology topology,
final Consumer<byte[], byte[]> globalConsumer,
final StateDirectory stateDirectory,
final StateRestoreListener stateRestoreListener,
final StreamsConfig config) {
super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
baseDir = stateDirectory.globalStateDir();
checkpointFile = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
checkpointFileCache = new HashMap<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a really long time to decipher the actual purposes of "checkpoint" and "checkpointableOffsets". I've renamed them to "checkpointFile" and "checkpointFileCache" to be more self-documenting.


// Find non persistent store's topics
final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic();
Expand All @@ -81,19 +94,19 @@ public GlobalStateManagerImpl(final LogContext logContext,
}
}

this.log = logContext.logger(GlobalStateManagerImpl.class);
log = logContext.logger(GlobalStateManagerImpl.class);
this.topology = topology;
this.globalConsumer = globalConsumer;
this.stateDirectory = stateDirectory;
this.stateRestoreListener = stateRestoreListener;
this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
}

@Override
public void setGlobalProcessorContext(final InternalProcessorContext processorContext) {
this.processorContext = processorContext;
public void setGlobalProcessorContext(final InternalProcessorContext globalProcessorContext) {
this.globalProcessorContext = globalProcessorContext;
}

@Override
Expand All @@ -103,11 +116,11 @@ public Set<String> initialize() {
throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir));
}
} catch (final IOException e) {
throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir));
throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir), e);
}

try {
this.checkpointableOffsets.putAll(checkpoint.read());
checkpointFileCache.putAll(checkpointFile.read());
} catch (final IOException e) {
try {
stateDirectory.unlockGlobalState();
Expand All @@ -120,20 +133,25 @@ public Set<String> initialize() {
final List<StateStore> stateStores = topology.globalStateStores();
for (final StateStore stateStore : stateStores) {
globalStoreNames.add(stateStore.name());
stateStore.init(processorContext, stateStore);
stateStore.init(globalProcessorContext, stateStore);
}
return Collections.unmodifiableSet(globalStoreNames);
}

@Override
public void reinitializeStateStoresForPartitions(final Collection<TopicPartition> partitions,
final InternalProcessorContext processorContext) {
super.reinitializeStateStoresForPartitions(
StateManagerUtil.reinitializeStateStoresForPartitions(
log,
eosEnabled,
baseDir,
globalStores,
topology.storeToChangelogTopic(),
partitions,
processorContext);
processorContext,
checkpointFile,
checkpointFileCache
);

globalConsumer.assign(partitions);
globalConsumer.seekToBeginning(partitions);
Expand Down Expand Up @@ -261,7 +279,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
final RecordConverter recordConverter) {
for (final TopicPartition topicPartition : topicPartitions) {
globalConsumer.assign(Collections.singletonList(topicPartition));
final Long checkpoint = checkpointableOffsets.get(topicPartition);
final Long checkpoint = checkpointFileCache.get(topicPartition);
if (checkpoint != null) {
globalConsumer.seek(topicPartition, checkpoint);
} else {
Expand Down Expand Up @@ -293,14 +311,14 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
log.warn("Restoring GlobalStore {} failed due to: {}. Deleting global store to recreate from scratch.",
storeName,
recoverableException.toString());
reinitializeStateStoresForPartitions(recoverableException.partitions(), processorContext);
reinitializeStateStoresForPartitions(recoverableException.partitions(), globalProcessorContext);

stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
restoreCount = 0L;
}
}
stateRestoreListener.onRestoreEnd(topicPartition, storeName, restoreCount);
checkpointableOffsets.put(topicPartition, offset);
checkpointFileCache.put(topicPartition, offset);
}
}

Expand All @@ -313,7 +331,7 @@ public void flush() {
try {
log.trace("Flushing global store={}", store.name());
store.flush();
} catch (final Exception e) {
} catch (final RuntimeException e) {
throw new ProcessorStateException(
String.format("Failed to flush global state store %s", store.name()),
e
Expand All @@ -338,12 +356,12 @@ public void close(final boolean clean) throws IOException {
log.debug("Closing global storage engine {}", entry.getKey());
try {
entry.getValue().get().close();
} catch (final Exception e) {
} catch (final RuntimeException e) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this PR is to clean up difficult-to-maintain code, I also included other cleanups, like dropping unnecessary this modifiers, restricting too-broad catch blocks, etc.

log.error("Failed to close global state store {}", entry.getKey(), e);
closeFailed.append("Failed to close global state store:")
.append(entry.getKey())
.append(". Reason: ")
.append(e.toString())
.append(e)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary toString

.append("\n");
}
globalStores.put(entry.getKey(), Optional.empty());
Expand All @@ -361,28 +379,28 @@ public void close(final boolean clean) throws IOException {

@Override
public void checkpoint(final Map<TopicPartition, Long> offsets) {
checkpointableOffsets.putAll(offsets);
checkpointFileCache.putAll(offsets);

final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();

// Skip non persistent store
for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet()) {
for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointFileCache.entrySet()) {
final String topic = topicPartitionOffset.getKey().topic();
if (!globalNonPersistentStoresTopics.contains(topic)) {
filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue());
}
}

try {
checkpoint.write(filteredOffsets);
checkpointFile.write(filteredOffsets);
} catch (final IOException e) {
log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e);
log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpointFile, e);
}
}

@Override
public Map<TopicPartition, Long> checkpointed() {
return Collections.unmodifiableMap(checkpointableOffsets);
return Collections.unmodifiableMap(checkpointFileCache);
}


Expand Down