Skip to content

Commit

Permalink
#586 use streams to publish messages to kafka
Browse files Browse the repository at this point in the history
Signed-off-by: Johannes Schneider <johannes.schneider@bosch.io>
  • Loading branch information
jokraehe committed Jun 11, 2021
1 parent a023f3c commit 49a0c5e
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 408 deletions.
Expand Up @@ -32,17 +32,25 @@ public final class DefaultKafkaConfig implements KafkaConfig {
private static final String CONFIG_PATH = "kafka";
private static final String CONSUMER_PATH = "consumer";
private static final String PRODUCER_PATH = "producer";
private static final String PRODUCER_QUEUE_SIZE = "producer-queue-size";
private static final String PRODUCER_PARALLELISM = "producer-parallelism";

private final Config consumerConfig;
private final Config producerConfig;
private final ThrottlingConfig consumerThrottlingConfig;

private final Config producerConfig;
private final int producerQueueSize;
private final int producerParallelism;

private DefaultKafkaConfig(final ScopedConfig kafkaScopedConfig) {
consumerConfig = kafkaScopedConfig.getConfig(CONSUMER_PATH);
producerConfig = kafkaScopedConfig.getConfig(PRODUCER_PATH);
consumerThrottlingConfig = ThrottlingConfig.of(kafkaScopedConfig.hasPath(CONSUMER_PATH)
? kafkaScopedConfig.getConfig(CONSUMER_PATH)
: ConfigFactory.empty());

producerConfig = kafkaScopedConfig.getConfig(PRODUCER_PATH);
producerQueueSize = kafkaScopedConfig.getInt(PRODUCER_QUEUE_SIZE);
producerParallelism = kafkaScopedConfig.getInt(PRODUCER_PARALLELISM);
}

/**
Expand All @@ -57,28 +65,40 @@ public static DefaultKafkaConfig of(final Config config) {
}

@Override
public ThrottlingConfig getConsumerThrottlingConfig() {
return consumerThrottlingConfig;
public Config getConsumerConfig() {
return consumerConfig;
}

@Override
public Config getConsumerConfig() {
return consumerConfig;
public ThrottlingConfig getConsumerThrottlingConfig() {
return consumerThrottlingConfig;
}

@Override
public Config getProducerConfig() {
return producerConfig;
}

@Override
public int getProducerQueueSize() {
return producerQueueSize;
}

@Override
public int getProducerParallelism() {
return producerParallelism;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final DefaultKafkaConfig that = (DefaultKafkaConfig) o;
return Objects.equals(consumerConfig, that.consumerConfig) &&
Objects.equals(consumerThrottlingConfig, that.consumerThrottlingConfig) &&
Objects.equals(producerConfig, that.producerConfig) &&
Objects.equals(consumerThrottlingConfig, that.consumerThrottlingConfig);
Objects.equals(producerQueueSize, that.producerQueueSize) &&
Objects.equals(producerParallelism, that.producerParallelism);
}

@Override
Expand All @@ -90,8 +110,10 @@ public int hashCode() {
public String toString() {
return getClass().getSimpleName() + " [" +
"consumerConfig=" + consumerConfig +
", producerConfig=" + producerConfig +
", consumerThrottlingConfig=" + consumerThrottlingConfig +
", producerConfig=" + producerConfig +
", producerQueueSize=" + producerQueueSize +
", producerParallelism=" + producerParallelism +
"]";
}
}
Expand Up @@ -25,18 +25,18 @@
public interface KafkaConfig {

/**
* Returns the consumer throttling config.
* Returns the Config for consumers needed by the Kafka client.
*
* @return the config.
* @return consumer configuration needed by the Kafka client.
*/
ThrottlingConfig getConsumerThrottlingConfig();
Config getConsumerConfig();

/**
* Returns the Config for consumers needed by the Kafka client.
* Returns the consumer throttling config.
*
* @return consumer configuration needed by the Kafka client.
* @return the config.
*/
Config getConsumerConfig();
ThrottlingConfig getConsumerThrottlingConfig();

/**
* Returns the Config for producers needed by the Kafka client.
Expand All @@ -45,4 +45,14 @@ public interface KafkaConfig {
*/
Config getProducerConfig();

/**
* @return number of maximum buffered messages before dropping them.
*/
int getProducerQueueSize();

/**
* @return number of maximum parallel message publications.
*/
int getProducerParallelism();

}
Expand Up @@ -16,6 +16,7 @@
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.service.config.KafkaConfig;

import akka.actor.Props;

Expand Down Expand Up @@ -53,9 +54,12 @@ public String getActorName() {
}

@Override
public Props props(final Connection connection, final KafkaProducerFactory factory, final boolean dryRun,
public Props props(final Connection connection,
final KafkaConfig config,
final PropertiesFactory propertiesFactory,
final boolean dryRun,
final String clientId) {
return KafkaPublisherActor.props(connection, factory, dryRun, clientId);
return KafkaPublisherActor.props(connection, config, propertiesFactory, dryRun, clientId);
}

}
Expand Up @@ -53,7 +53,6 @@ public final class KafkaClientActor extends BaseClientActor {
private final KafkaPublisherActorFactory publisherActorFactory;
private final Set<ActorRef> pendingStatusReportsFromStreams;
private final PropertiesFactory propertiesFactory;
private final KafkaProducerFactory producerFactory;

private CompletableFuture<Status.Status> testConnectionFuture = null;
private ActorRef kafkaPublisherActor;
Expand All @@ -71,8 +70,6 @@ private KafkaClientActor(final Connection connection,
final ConnectionConfig connectionConfig = connectivityConfig.getConnectionConfig();
kafkaConfig = connectionConfig.getKafkaConfig();
propertiesFactory = PropertiesFactory.newInstance(connection, kafkaConfig, getClientId(connection.getId()));
producerFactory = DefaultKafkaProducerFactory.getInstance(propertiesFactory.getProducerSettings(),
getContext().getSystem());
this.publisherActorFactory = publisherActorFactory;
pendingStatusReportsFromStreams = new HashSet<>();
}
Expand Down Expand Up @@ -176,7 +173,7 @@ private void startKafkaPublisher(final boolean dryRun, final ConnectionId connec
// ensure no previous publisher stays in memory
stopPublisherActor();
final Props publisherActorProps =
publisherActorFactory.props(connection(), producerFactory, dryRun, getDefaultClientId());
publisherActorFactory.props(connection(), kafkaConfig, propertiesFactory, dryRun, getDefaultClientId());
kafkaPublisherActor = startChildActorConflictFree(publisherActorFactory.getActorName(), publisherActorProps);
pendingStatusReportsFromStreams.add(kafkaPublisherActor);
}
Expand Down

0 comments on commit 49a0c5e

Please sign in to comment.