Skip to content

Commit

Permalink
Add tests for filtering of expired inbound Kafka messages
Browse files Browse the repository at this point in the history
Additionally log failed expiry validation on warning.

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Feb 3, 2022
1 parent 2e1f6aa commit abc6509
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.slf4j.Logger;

/**
* Kafka transformation result containing either a {@link DittoRuntimeException} in case of a failure or an
Expand All @@ -32,12 +34,14 @@ final class TransformationResult {

private static final String CREATION_TIME = "creation-time";
private static final String TTL = "ttl";
private static final Logger LOGGER = DittoLoggerFactory.getLogger(TransformationResult.class);

@Nullable private final DittoRuntimeException dittoRuntimeException;
@Nullable private final ExternalMessage externalMessage;

private TransformationResult(@Nullable final DittoRuntimeException dittoRuntimeException,
@Nullable final ExternalMessage externalMessage) {

this.dittoRuntimeException = dittoRuntimeException;
this.externalMessage = externalMessage;
}
Expand Down Expand Up @@ -76,11 +80,17 @@ boolean isExpired() {
.map(Long::parseLong);
if (creationTimeOptional.isPresent() && ttlOptional.isPresent()) {
final long timeSinceCreation = now - creationTimeOptional.get();
return timeSinceCreation >= ttlOptional.get();
final var result = timeSinceCreation >= ttlOptional.get();
LOGGER.debug("Evaluating Kafka message expiry with creation-time: <{}>, time since creation: " +
"<{}> and ttl: <{}> to: <{}>",
creationTimeOptional, timeSinceCreation, ttlOptional, result);
return result;
}
return false;
} catch (final Exception e) {
// Errors during reading/parsing headers should not cause the message to be dropped.
final Object message = null != externalMessage ? externalMessage : dittoRuntimeException;
LOGGER.warn("Encountered error checking the expiry of Kafka message: <{}>, <{}>", message, e);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -155,4 +156,47 @@ public void appliesBackPressureWhenMessagesAreNotAcknowledged() {
}};
}

@Test
public void filtersExpiredMessages() {
new TestKit(actorSystem) {{
/*
* Given we have a kafka source which emits records that are all transformed to External messages.
*/
final ConsumerRecord<String, ByteBuffer> consumerRecord =
new ConsumerRecord<>("topic", 1, 1, Instant.now().toEpochMilli(), TimestampType.LOG_APPEND_TIME,
-1L, NULL_SIZE, NULL_SIZE, "Key", ByteBufferUtils.fromUtf8String("Value"),
new RecordHeaders());
final ConsumerMessage.CommittableMessage<String, ByteBuffer> committableMessage =
new ConsumerMessage.CommittableMessage<>(consumerRecord, mock(
ConsumerMessage.CommittableOffset.class));
final AtLeastOnceKafkaConsumerSourceSupplier sourceSupplier =
mock(AtLeastOnceKafkaConsumerSourceSupplier.class);
when(sourceSupplier.get()).thenReturn(source);
final KafkaMessageTransformer messageTransformer = mock(KafkaMessageTransformer.class);
final ExternalMessage message = mock(ExternalMessage.class);
when(message.getHeaders()).thenReturn(Map.of("creation-time", "0", "ttl", "1000"));
final TransformationResult result = TransformationResult.successful(message);

when(messageTransformer.transform(ArgumentMatchers.<ConsumerMessage.CommittableMessage<String, ByteBuffer>>any()))
.thenReturn(CommittableTransformationResult.of(result, committableMessage.committableOffset()));
final ConnectionMonitor connectionMonitor = mock(ConnectionMonitor.class);
final ConnectionMonitor ackMonitor = mock(ConnectionMonitor.class);
final Materializer materializer = Materializer.createMaterializer(actorSystem);
final Sink<DittoRuntimeException, TestSubscriber.Probe<DittoRuntimeException>> dreSink =
TestSink.create(actorSystem);

// When starting the stream
new AtLeastOnceConsumerStream(sourceSupplier, CommitterSettings.apply(actorSystem),
TestConstants.KAFKA_THROTTLING_CONFIG,
messageTransformer, false, materializer,
connectionMonitor, ackMonitor, inboundMappingSink, dreSink,
ConnectionId.generateRandom(), "someUniqueId");

inboundSinkProbe.ensureSubscription();
assertThat(sourceQueue.get().offer(committableMessage)).isEqualTo(QueueOfferResult.enqueued());
inboundSinkProbe.request(1);
inboundSinkProbe.expectNoMessage();
}};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -147,4 +148,41 @@ public void appliesBackPressureWhenMessagesAreNotAcknowledged() {
}};
}

@Test
public void filtersExpiredMessages() {
new TestKit(actorSystem) {{
/*
* Given we have a kafka source which emits records that are all transformed to External messages.
*/
final ConsumerRecord<String, ByteBuffer> consumerRecord =
new ConsumerRecord<>("topic", 1, 1, Instant.now().toEpochMilli(), TimestampType.LOG_APPEND_TIME,
-1L, NULL_SIZE, NULL_SIZE, "Key", ByteBufferUtils.fromUtf8String("Value"),
new RecordHeaders());
final AtMostOnceKafkaConsumerSourceSupplier sourceSupplier =
mock(AtMostOnceKafkaConsumerSourceSupplier.class);
when(sourceSupplier.get()).thenReturn(source);
final KafkaMessageTransformer messageTransformer = mock(KafkaMessageTransformer.class);
final ExternalMessage message = mock(ExternalMessage.class);
when(message.getHeaders()).thenReturn(Map.of("creation-time", "0", "ttl", "1000"));
final TransformationResult result = TransformationResult.successful(message);
when(messageTransformer.transform(ArgumentMatchers.<ConsumerRecord<String, ByteBuffer>>any()))
.thenReturn(result);
final ConnectionMonitor connectionMonitor = mock(ConnectionMonitor.class);
final Materializer materializer = Materializer.createMaterializer(actorSystem);
final Sink<DittoRuntimeException, TestSubscriber.Probe<DittoRuntimeException>> dreSink =
TestSink.create(actorSystem);

// When starting the stream
new AtMostOnceConsumerStream(sourceSupplier, TestConstants.KAFKA_THROTTLING_CONFIG, messageTransformer,
false, materializer,
connectionMonitor, inboundMappingSink, dreSink,
ConnectionId.generateRandom(),
"someUniqueId");

assertThat(sourceQueue.get().offer(consumerRecord)).isEqualTo(QueueOfferResult.enqueued());
inboundSinkProbe.request(1);
inboundSinkProbe.expectNoMessage();
}};
}

}

0 comments on commit abc6509

Please sign in to comment.