Skip to content

Commit

Permalink
KAFKA-6860: Fix NPE in Kafka Streams with EOS enabled (#5187)
Browse files Browse the repository at this point in the history
Reviewers: John Roesler <john@confluent.io>, Ko Byoung Kwon, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
  • Loading branch information
mjsax committed Jun 13, 2018
1 parent c92445a commit eaf6cf4
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 62 deletions.
Expand Up @@ -36,17 +36,18 @@ abstract class AbstractStateManager implements StateManager {
static final String CHECKPOINT_FILE_NAME = ".checkpoint";

final File baseDir;
final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();

private final boolean eosEnabled;
OffsetCheckpoint checkpoint;

final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
final Map<String, StateStore> stores = new LinkedHashMap<>();
final Map<String, StateStore> globalStores = new LinkedHashMap<>();

AbstractStateManager(final File baseDir) {
AbstractStateManager(final File baseDir,
final boolean eosEnabled) {
this.baseDir = baseDir;
this.eosEnabled = eosEnabled;
this.checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));

}

public void reinitializeStateStoresForPartitions(final Logger log,
Expand All @@ -62,11 +63,14 @@ public void reinitializeStateStoresForPartitions(final Logger log,
checkpointableOffsets.remove(topicPartition);
storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
}
try {
checkpoint.write(checkpointableOffsets);
} catch (final IOException fatalException) {
log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stateStores, fatalException);
throw new StreamsException("Failed to reinitialize global store.", fatalException);

if (!eosEnabled) {
try {
checkpoint.write(checkpointableOffsets);
} catch (final IOException fatalException) {
log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stateStores, fatalException);
throw new StreamsException("Failed to reinitialize global store.", fatalException);
}
}

for (final Map.Entry<String, StateStore> entry : storesCopy.entrySet()) {
Expand Down
Expand Up @@ -69,7 +69,7 @@ public GlobalStateManagerImpl(final LogContext logContext,
final StateDirectory stateDirectory,
final StateRestoreListener stateRestoreListener,
final StreamsConfig config) {
super(stateDirectory.globalStateDir());
super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));

this.log = logContext.logger(GlobalStateManagerImpl.class);
this.topology = topology;
Expand All @@ -92,16 +92,16 @@ public Set<String> initialize() {
if (!stateDirectory.lockGlobalState()) {
throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir));
}
} catch (IOException e) {
} catch (final IOException e) {
throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir));
}

try {
this.checkpointableOffsets.putAll(checkpoint.read());
} catch (IOException e) {
} catch (final IOException e) {
try {
stateDirectory.unlockGlobalState();
} catch (IOException e1) {
} catch (final IOException e1) {
log.error("Failed to unlock the global state directory", e);
}
throw new StreamsException("Failed to read checkpoints for global state globalStores", e);
Expand Down Expand Up @@ -232,7 +232,7 @@ private List<TopicPartition> topicPartitionsForStore(final StateStore store) {
}

final List<TopicPartition> topicPartitions = new ArrayList<>();
for (PartitionInfo partition : partitionInfos) {
for (final PartitionInfo partition : partitionInfos) {
topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
return topicPartitions;
Expand All @@ -253,8 +253,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,

long offset = globalConsumer.position(topicPartition);
final Long highWatermark = highWatermarks.get(topicPartition);
BatchingStateRestoreCallback
stateRestoreAdapter =
final BatchingStateRestoreCallback stateRestoreAdapter =
(BatchingStateRestoreCallback) ((stateRestoreCallback instanceof
BatchingStateRestoreCallback)
? stateRestoreCallback
Expand All @@ -267,7 +266,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
try {
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime);
final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
for (ConsumerRecord<byte[], byte[]> record : records) {
for (final ConsumerRecord<byte[], byte[]> record : records) {
if (record.key() != null) {
restoreRecords.add(KeyValue.pair(record.key(), record.value()));
}
Expand All @@ -294,11 +293,11 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
@Override
public void flush() {
log.debug("Flushing all global globalStores registered in the state manager");
for (StateStore store : this.globalStores.values()) {
for (final StateStore store : this.globalStores.values()) {
try {
log.trace("Flushing global store={}", store.name());
store.flush();
} catch (Exception e) {
} catch (final Exception e) {
throw new ProcessorStateException(String.format("Failed to flush global state store %s", store.name()), e);
}
}
Expand All @@ -316,7 +315,7 @@ public void close(final Map<TopicPartition, Long> offsets) throws IOException {
log.debug("Closing global storage engine {}", entry.getKey());
try {
entry.getValue().close();
} catch (Exception e) {
} catch (final Exception e) {
log.error("Failed to close global state store {}", entry.getKey(), e);
closeFailed.append("Failed to close global state store:")
.append(entry.getKey())
Expand All @@ -341,7 +340,7 @@ public void checkpoint(final Map<TopicPartition, Long> offsets) {
if (!checkpointableOffsets.isEmpty()) {
try {
checkpoint.write(checkpointableOffsets);
} catch (IOException e) {
} catch (final IOException e) {
log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e);
}
}
Expand Down
Expand Up @@ -67,7 +67,7 @@ public ProcessorStateManager(final TaskId taskId,
final ChangelogReader changelogReader,
final boolean eosEnabled,
final LogContext logContext) throws IOException {
super(stateDirectory.directoryForTask(taskId));
super(stateDirectory.directoryForTask(taskId), eosEnabled);

this.log = logContext.logger(ProcessorStateManager.class);
this.taskId = taskId;
Expand All @@ -81,12 +81,11 @@ public ProcessorStateManager(final TaskId taskId,
offsetLimits = new HashMap<>();
standbyRestoredOffsets = new HashMap<>();
this.isStandby = isStandby;
restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
restoreCallbacks = isStandby ? new HashMap<>() : null;
this.storeToChangelogTopic = storeToChangelogTopic;

// load the checkpoint information
checkpointableOffsets.putAll(checkpoint.read());

if (eosEnabled) {
// delete the checkpoint file after finish loading its stored offsets
checkpoint.delete();
Expand Down Expand Up @@ -169,11 +168,7 @@ public Map<TopicPartition, Long> checkpointed() {
final int partition = getPartition(topicName);
final TopicPartition storePartition = new TopicPartition(topicName, partition);

if (checkpointableOffsets.containsKey(storePartition)) {
partitionsAndOffsets.put(storePartition, checkpointableOffsets.get(storePartition));
} else {
partitionsAndOffsets.put(storePartition, -1L);
}
partitionsAndOffsets.put(storePartition, checkpointableOffsets.getOrDefault(storePartition, -1L));
}
return partitionsAndOffsets;
}
Expand Down Expand Up @@ -340,7 +335,7 @@ public StateStore getGlobalStore(final String name) {
return globalStores.get(name);
}

private BatchingStateRestoreCallback getBatchingRestoreCallback(StateRestoreCallback callback) {
private BatchingStateRestoreCallback getBatchingRestoreCallback(final StateRestoreCallback callback) {
if (callback instanceof BatchingStateRestoreCallback) {
return (BatchingStateRestoreCallback) callback;
}
Expand Down

0 comments on commit eaf6cf4

Please sign in to comment.