Skip to content

Commit

Permalink
More control on event publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Nov 24, 2023
1 parent 3016eff commit 05b229e
Showing 1 changed file with 9 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,24 +148,15 @@ public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> eve
LOGGER.debug("Publishing event in memory : \n{} ", events);
return Flux
.fromIterable(events)
.concatMap(t ->
Mono.defer(() -> {
Sinks.EmitResult emitResult = queue.tryEmitNext(t);
LOGGER.debug("Event publisher {}, {} buffered elements ( capacity = {} ), emitResult = {}, event = {}", topic, queue.scan(Scannable.Attr.BUFFERED), queue.scan(Scannable.Attr.CAPACITY), emitResult, t);
if (emitResult.isFailure()) {
return Mono.error(new RuntimeException("Error publishing to queue for %s : %s".formatted(topic, emitResult)));
} else {
return Mono.just("");
}
})
.retryWhen(Retry
.backoff(5, Duration.ofMillis(500))
.doBeforeRetry(ctx -> {
LOGGER.error("Error publishing to queue %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure());
})
)
.onErrorReturn("")
)
.map(t -> {
try {
queue.emitNext(t, Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(1)));
return Tuple.empty();
} catch (Exception e) {
LOGGER.error("Error publishing to topic %s".formatted(topic), e);
return Tuple.empty();
}
})
.collectList()
.thenReturn(Tuple.empty())
.toFuture();
Expand Down

0 comments on commit 05b229e

Please sign in to comment.