Skip to content

Commit

Permalink
#586 disable parallel publishing, use separate dispatcher for publishing
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Jun 15, 2021
1 parent 20b3b40 commit 3d86374
Showing 1 changed file with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import akka.Done;
import akka.actor.Props;
import akka.actor.Status;
import akka.dispatch.MessageDispatcher;
import akka.japi.Pair;
import akka.japi.pf.ReceiveBuilder;
import akka.kafka.javadsl.SendProducer;
Expand All @@ -75,9 +76,12 @@
final class KafkaPublisherActor extends BasePublisherActor<KafkaPublishTarget> {

static final String ACTOR_NAME = "kafkaPublisher";
private static final String DISPATCHER_NAME = "kafka-connection-dispatcher";

private final boolean dryRun;
private final KafkaProducerStream producerStream;
private final MessageDispatcher kafkaConnectionDispatcher;


@SuppressWarnings("unused")
private KafkaPublisherActor(final Connection connection,
Expand All @@ -88,6 +92,7 @@ private KafkaPublisherActor(final Connection connection,
super(connection, clientId);
this.dryRun = dryRun;
final Materializer materializer = Materializer.createMaterializer(this::getContext);
kafkaConnectionDispatcher = getContext().getSystem().dispatchers().lookup(DISPATCHER_NAME);
producerStream = new KafkaProducerStream(config, materializer, producerFactory);
reportInitialConnectionState();
}
Expand Down Expand Up @@ -316,14 +321,16 @@ private KafkaProducerStream(final KafkaConfig config, final Materializer materia

final SendProducer<String, String> sendProducer = producerFactory.newSendProducer();


final Pair<Pair<SourceQueueWithComplete<Pair<ProducerRecord<String, String>, KafkaMessageContext>>, UniqueKillSwitch>, CompletionStage<Done>>
mat = Source.<Pair<ProducerRecord<String, String>, KafkaMessageContext>>queue(
config.getProducerQueueSize(),
OverflowStrategy.dropNew())
.mapAsync(config.getProducerParallelism(), msg -> {
.mapAsync(1 /* temp no parallelism*/, msg -> {
final ProducerRecord<String, String> record = msg.first();
final KafkaMessageContext context = msg.second();
return context.onPublishMessage(sendProducer, record);
return CompletableFuture.supplyAsync(() -> context.onPublishMessage(sendProducer, record),
kafkaConnectionDispatcher);
})
.viaMat(KillSwitches.single(), Keep.both())
.toMat(Sink.ignore(), Keep.both())
Expand Down

0 comments on commit 3d86374

Please sign in to comment.