Skip to content

Commit

Permalink
[FLINK-5293] Add test for Kafka backwards compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u authored and aljoscha committed Dec 20, 2016
1 parent 216653a commit b43067b
Show file tree
Hide file tree
Showing 5 changed files with 543 additions and 8 deletions.
Expand Up @@ -321,14 +321,18 @@ public void initializeState(FunctionInitializationContext context) throws Except
offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

if (context.isRestored()) {
restoreToOffset = new HashMap<>();
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
}
if (restoreToOffset == null) {
restoreToOffset = new HashMap<>();
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
}

LOG.info("Setting restore state in the FlinkKafkaConsumer.");
if (LOG.isDebugEnabled()) {
LOG.debug("Using the following offsets: {}", restoreToOffset);
LOG.info("Setting restore state in the FlinkKafkaConsumer.");
if (LOG.isDebugEnabled()) {
LOG.debug("Using the following offsets: {}", restoreToOffset);
}
} else if (restoreToOffset.isEmpty()) {
restoreToOffset = null;
}
} else {
LOG.info("No restore state for FlinkKafkaConsumer.");
Expand Down
Expand Up @@ -197,7 +197,7 @@ public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
*
* @param snapshotState The offsets for the partitions
*/
public void restoreOffsets(HashMap<KafkaTopicPartition, Long> snapshotState) {
public void restoreOffsets(Map<KafkaTopicPartition, Long> snapshotState) {
for (KafkaTopicPartitionState<?> partition : allPartitions) {
Long offset = snapshotState.get(partition.getKafkaTopicPartition());
if (offset != null) {
Expand Down

0 comments on commit b43067b

Please sign in to comment.