Skip to content

Commit

Permalink
#586 drop consumed messages in dry run mode, disable auto commit in d…
Browse files Browse the repository at this point in the history
…ry run mode

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Jun 10, 2021
1 parent 2546315 commit 040804a
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
Expand Up @@ -70,7 +70,7 @@ private KafkaConsumerActor(final Connection connection,

log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);

final ConsumerSettings<String, String> consumerSettings = propertiesFactory.getConsumerSettings();
final ConsumerSettings<String, String> consumerSettings = propertiesFactory.getConsumerSettings(dryRun);
final Enforcement enforcement = source.getEnforcement().orElse(null);
final EnforcementFilterFactory<Map<String, String>, Signal<?>> headerEnforcementFilterFactory =
enforcement != null
Expand Down Expand Up @@ -147,13 +147,14 @@ private KafkaConsumerStream(
final KafkaConfig kafkaConfig,
final ConsumerSettings<String, String> consumerSettings,
final KafkaMessageTransformer kafkaMessageTransformer,
final boolean dryRun, //TODO: handle dry run
final boolean dryRun,
final Materializer materializer) {

this.materializer = materializer;
runnableKafkaStream = Consumer.plainSource(consumerSettings, Subscriptions.topics(sourceAddress))
.throttle(kafkaConfig.getConsumerThrottlingConfig().getLimit(),
kafkaConfig.getConsumerThrottlingConfig().getInterval())
.filter(record -> isNotDryRun(record, dryRun))
.filter(consumerRecord -> consumerRecord.value() != null)
.filter(this::isNotExpired)
.map(kafkaMessageTransformer::transform)
Expand Down Expand Up @@ -197,6 +198,14 @@ private boolean isNotExpired(final ConsumerRecord<String, String> consumerRecord
}
}

private boolean isNotDryRun(final ConsumerRecord<String, String> record, final boolean dryRun) {
if (dryRun && log.isDebugEnabled()) {
log.debug("Dropping record (key: {}, topic: {}, partition: {}, offset: {}) in dry run mode.",
record.key(), record.topic(), record.partition(), record.offset());
}
return !dryRun;
}

private void forwardExternalMessage(final ExternalMessage value) {
inboundMonitor.success(value);
forwardToMappingActor(value,
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.ditto.connectivity.model.Connection;
Expand Down Expand Up @@ -68,11 +69,16 @@ static PropertiesFactory newInstance(final Connection connection, final KafkaCon
*
* @return the settings.
*/
ConsumerSettings<String, String> getConsumerSettings() {
return ConsumerSettings.apply(config.getConsumerConfig(), new StringDeserializer(), new StringDeserializer())
.withBootstrapServers(bootstrapServers)
.withGroupId(connection.getId().toString())
.withClientId(clientId + "-consumer");
ConsumerSettings<String, String> getConsumerSettings(final boolean dryRun) {
final ConsumerSettings<String, String> consumerSettings =
ConsumerSettings.apply(config.getConsumerConfig(), new StringDeserializer(), new StringDeserializer())
.withBootstrapServers(bootstrapServers)
.withGroupId(connection.getId().toString())
.withClientId(clientId + "-consumer");

// disable auto commit in dry run mode
return dryRun ? consumerSettings.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") :
consumerSettings;
}

ProducerSettings<String, String> getProducerSettings() {
Expand Down
Expand Up @@ -108,7 +108,7 @@ public void addsBootstrapServersAndFlattensPropertiesFromProducerSettings() {
@Test
public void addsBootstrapServersAndFlattensPropertiesFromConsumerSettings() {

final ConsumerSettings<String, String> consumerSettings = underTest.getConsumerSettings();
final ConsumerSettings<String, String> consumerSettings = underTest.getConsumerSettings(false);
final Map<String, Object> properties = consumerSettings.getProperties();

final List<String> servers =
Expand Down

0 comments on commit 040804a

Please sign in to comment.