Skip to content

Commit

Permalink
improves: Error in onPartitionsAssigned in parallel consumer (#537)
Browse files Browse the repository at this point in the history
* Add Invalid Offset Metadata error policy option

* Fix file headers

* Fix tests

* Add some javadocs to the new invalidOffsetMetadataPolicy option

* Add suggested missing test
  • Loading branch information
nachomdo committed Jul 17, 2023
1 parent f5debce commit 8417498
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
Expand Down Expand Up @@ -301,6 +301,29 @@ public void setCommitInterval(Duration commitInterval) {

public static final Duration DEFAULT_STATIC_RETRY_DELAY = Duration.ofSeconds(1);

/**
* Error handling strategy to use when invalid offsets metadata is encountered. This could happen accidentally or
* deliberately if the user attempts to reuse an existing consumer group id.
*/
public enum InvalidOffsetMetadataHandlingPolicy {
/**
* Fails and shuts down the application. This is the default.
*/
FAIL,
/**
* Ignore the error, logs a warning message and continue processing from the last committed offset.
*/
IGNORE
}

/**
* Controls the error handling behaviour to use when invalid offsets metadata from a pre-existing consumer group is encountered.
* A potential scenario where this could occur is when a consumer group id from a Kafka Streams application is accidentally reused.
* <p>
* Default is {@link InvalidOffsetMetadataHandlingPolicy#FAIL}
*/
@Builder.Default
private final InvalidOffsetMetadataHandlingPolicy invalidOffsetMetadataPolicy = InvalidOffsetMetadataHandlingPolicy.FAIL;
/**
* When a message fails, how long the system should wait before trying that message again. Note that this will not
* be exact, and is just a target.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package io.confluent.parallelconsumer.offsets;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.InternalRuntimeException;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.HighestOffsetAndIncompletes;
import lombok.Getter;
Expand Down Expand Up @@ -101,8 +102,12 @@ public String getDecodedString() {
return binaryArrayString;
}

@SneakyThrows
public HighestOffsetAndIncompletes getDecodedIncompletes(long baseOffset) {
return getDecodedIncompletes(baseOffset, ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy.FAIL);
}

@SneakyThrows
public HighestOffsetAndIncompletes getDecodedIncompletes(long baseOffset, ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy errorPolicy) {
HighestOffsetAndIncompletes binaryArrayString = switch (encoding) {
// case ByteArray -> deserialiseByteArrayToBitMapString(data);
// case ByteArrayCompressed -> deserialiseByteArrayToBitMapString(decompressZstd(data));
Expand All @@ -114,8 +119,14 @@ public HighestOffsetAndIncompletes getDecodedIncompletes(long baseOffset) {
case BitSetV2Compressed -> deserialiseBitSetWrapToIncompletes(BitSetV2, baseOffset, decompressZstd(data));
case RunLengthV2 -> runLengthDecodeToIncompletes(encoding, baseOffset, data);
case RunLengthV2Compressed -> runLengthDecodeToIncompletes(RunLengthV2, baseOffset, decompressZstd(data));
case KafkaStreams, KafkaStreamsV2 ->
case KafkaStreams, KafkaStreamsV2 ->{
if (errorPolicy == ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy.IGNORE) {
log.warn("Ignoring existing Kafka Streams offset metadata and reusing offsets");
yield HighestOffsetAndIncompletes.of(baseOffset);
} else {
throw new KafkaStreamsEncodingNotSupported();
}
}
default ->
throw new UnsupportedOperationException("Encoding (" + encoding.description() + ") not supported");
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package io.confluent.parallelconsumer.offsets;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.InternalRuntimeException;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.state.PartitionState;
Expand Down Expand Up @@ -62,6 +63,8 @@ public class OffsetMapCodecManager<K, V> {

private final PCModule module;

private static ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy errorPolicy = ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy.FAIL;

/**
* Decoding result for encoded offsets
*/
Expand Down Expand Up @@ -100,6 +103,9 @@ public static HighestOffsetAndIncompletes of() {
// todo remove consumer #233
public OffsetMapCodecManager(PCModule<K, V> module) {
this.module = module;
if (module != null){
this.errorPolicy = module.options().getInvalidOffsetMetadataPolicy();
}
}

/**
Expand Down Expand Up @@ -243,7 +249,7 @@ static HighestOffsetAndIncompletes decodeCompressedOffsets(long nextExpectedOffs
return HighestOffsetAndIncompletes.of(highestSeenOffsetIsThen);
} else {
var result = EncodedOffsetPair.unwrap(decodedBytes);
return result.getDecodedIncompletes(nextExpectedOffset);
return result.getDecodedIncompletes(nextExpectedOffset, errorPolicy);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.offsets;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import com.google.common.truth.Truth;
Expand Down Expand Up @@ -43,7 +43,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Optional.of;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

// todo refactor - remove tests which use hard coded state vs dynamic state - #compressionCycle, #selialiseCycle, #runLengthEncoding, #loadCompressedRunLengthRncoding
@Slf4j
Expand Down Expand Up @@ -365,7 +365,7 @@ void deserialiseBitSet() {
*/
@SneakyThrows
@Test
void deserialiseKafkaStreamsV1() {
void deserialiseKafkaStreamsV1WithDefaultErrorPolicy() {
final var input = ByteBuffer.allocate(32);
// magic number
input.put((byte) 1);
Expand All @@ -377,12 +377,33 @@ void deserialiseKafkaStreamsV1() {
.isInstanceOf(KafkaStreamsEncodingNotSupported.class);
}

/**
* Tests for ignoring when InvalidOffsetMetadataHandlingPolicy.IGNORE and Kafka Streams (as far as we can guess) magic numbers are found in the offset metadata.
*/
@SneakyThrows
@Test
void deserialiseKafkaStreamsV1WithIgnoreErrorPolicy() {
final var input = ByteBuffer.allocate(32);
// magic number
input.put((byte) 1);
// timestamp
input.putLong(System.currentTimeMillis());

EncodedOffsetPair encodedOffsetPair = EncodedOffsetPair.unwrap(input.array());

OffsetMapCodecManager.HighestOffsetAndIncompletes longs = encodedOffsetPair.getDecodedIncompletes(100L, ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy.IGNORE);

assertThat(longs.getHighestSeenOffset()).isEqualTo(Optional.of(100L));
assertThat(longs.getIncompleteOffsets()).isEqualTo(Collections.emptySet());

}

/**
* Tests for friendly errors when Kafka Streams V2 (as far as we can guess) magic numbers are found in the offset metadata.
*/
@SneakyThrows
@Test
void deserialiseKafkaStreamsV2() {
void deserialiseKafkaStreamsV2WithDefaultErrorPolicy() {
final var input = ByteBuffer.allocate(32);
// magic number
input.put((byte) 2);
Expand All @@ -403,6 +424,34 @@ void deserialiseKafkaStreamsV2() {
.isInstanceOf(KafkaStreamsEncodingNotSupported.class);
}

/**
* Tests for friendly errors when Kafka Streams V2 (as far as we can guess) magic numbers are found in the offset metadata.
*/
@SneakyThrows
@Test
void deserialiseKafkaStreamsV2WithIgnoreErrorPolicy() {
final var input = ByteBuffer.allocate(32);
// magic number
input.put((byte) 2);
// timestamp
input.putLong(System.currentTimeMillis());
// metadata
// number of entries
input.putInt(1);
// key size
input.putInt(1);
// key
input.put((byte) 'a');
// value
input.putLong(1L);

EncodedOffsetPair encodedOffsetPair = EncodedOffsetPair.unwrap(input.array());
OffsetMapCodecManager.HighestOffsetAndIncompletes longs = encodedOffsetPair.getDecodedIncompletes(100L, ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy.IGNORE);

assertThat(longs.getHighestSeenOffset()).isEqualTo(Optional.of(100L));
assertThat(longs.getIncompleteOffsets()).isEqualTo(Collections.emptySet());
}

@SneakyThrows
@Test
void compressionCycle() {
Expand Down

0 comments on commit 8417498

Please sign in to comment.