Skip to content

Commit

Permalink
Filter expired messages based on creation-time and ttl headers
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jun 7, 2021
1 parent 13a79c5 commit 525105a
Showing 1 changed file with 30 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
import static org.eclipse.ditto.connectivity.api.EnforcementFactoryFactory.newEnforcementFilterFactory;
import static org.eclipse.ditto.internal.models.placeholders.PlaceholderFactory.newHeadersPlaceholder;

import java.time.Instant;
import java.util.Map;
import java.util.Optional;

import javax.annotation.Nullable;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
Expand Down Expand Up @@ -49,6 +54,8 @@
*/
final class KafkaConsumerActor extends BaseConsumerActor {

private static final String TTL = "ttl";
private static final String CREATION_TIME = "creation-time";
static final String ACTOR_NAME_PREFIX = "kafkaConsumer-";

private final ThreadSafeDittoLoggingAdapter log;
Expand Down Expand Up @@ -148,6 +155,7 @@ private KafkaConsumerStream(
.throttle(kafkaConfig.getConsumerThrottlingConfig().getLimit(),
kafkaConfig.getConsumerThrottlingConfig().getInterval())
.filter(consumerRecord -> consumerRecord.value() != null)
.filter(this::isNotExpired)
.map(kafkaMessageTransformer::transform)
.divertTo(this.externalMessageSink(), this::isExternalMessage)
.divertTo(this.dittoRuntimeExceptionSink(), this::isDittoRuntimeException)
Expand All @@ -167,6 +175,28 @@ private ExternalMessage extractExternalMessage(final Either<ExternalMessage, Dit
return value.left().get();
}

private boolean isNotExpired(final ConsumerRecord<String, String> consumerRecord) {
final Headers headers = consumerRecord.headers();
final long now = Instant.now().toEpochMilli();
try {
final Optional<Long> creationTimeOptional = Optional.ofNullable(headers.lastHeader(CREATION_TIME))
.map(Header::value)
.map(String::new)
.map(Long::parseLong);
final Optional<Long> ttlOptional = Optional.ofNullable(headers.lastHeader(TTL))
.map(Header::value)
.map(String::new)
.map(Long::parseLong);
if (creationTimeOptional.isPresent() && ttlOptional.isPresent()) {
return now - creationTimeOptional.get() >= ttlOptional.get();
}
return true;
} catch (final Exception e) {
// Errors during reading/parsing headers should not cause the message to be dropped.
return true;
}
}

private void forwardExternalMessage(final ExternalMessage value) {
inboundMonitor.success(value);
forwardToMappingActor(value,
Expand Down

0 comments on commit 525105a

Please sign in to comment.