Skip to content

Commit

Permalink
remove debug logs in stream which were used during development
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Aug 26, 2021
1 parent 0008ecf commit b8f72b5
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ public CompletionStage<Done> start() throws IllegalStateException {
.runWith(unexpectedMessageSink(), materializer);
}


@Override
public void stop() {
if (consumerControl != null) {
Expand All @@ -110,8 +109,6 @@ public void stop() {
private Sink<KafkaAcknowledgableMessage, NotUsed> committerSink() {
return Flow.of(KafkaAcknowledgableMessage.class)
.mapAsync(1, KafkaAcknowledgableMessage::getAcknowledgementFuture)
.log("yannic test")
.withAttributes(Attributes.logLevels(Attributes.logLevelInfo(), Attributes.logLevelInfo(), Attributes.logLevelInfo()))
.to(Committer.sink(committerSettings));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ final class KafkaAcknowledgableMessage {
final ConsumerMessage.CommittableOffset committableOffset) {
this.acknowledgementFuture = new CompletableFuture<>();
this.acknowledgeableMessage = AcknowledgeableMessage.of(message,
() ->
acknowledgementFuture.complete(committableOffset),
() -> acknowledgementFuture.complete(committableOffset),
shouldRedeliver -> {
if (shouldRedeliver) {
acknowledgementFuture.completeExceptionally(MessageRejectedException.getInstance());
Expand Down

0 comments on commit b8f72b5

Please sign in to comment.