Skip to content

Commit

Permalink
fix handling weak ack for built-in ack labels in StreamingSessionActor
Browse files Browse the repository at this point in the history
* before an error to the WS session was sent declaring that ack-label 'live-response' was not declared if a WS subscribed for live messages, but filtered them out based on an RQL filter
* now this situation is handled correctly by dropping the weak ack

Signed-off-by: Thomas Jäckle <thomas.jaeckle@beyonnex.io>
  • Loading branch information
thjaeckle committed Jun 28, 2023
1 parent 54e25dd commit bc8e6a2
Showing 1 changed file with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotDeclaredException;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.FatalPubSubException;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId;
Expand Down Expand Up @@ -268,6 +269,7 @@ private Receive createIncomingSignalBehavior() {
.build();

final Receive signalBehavior = ReceiveBuilder.create()
.match(Acknowledgement.class, this::isWeakAckForBuiltInAckLabel, this::dropWeakAckForBuiltInAckLabelAcknowledgement)
.match(Acknowledgement.class, this::hasUndeclaredAckLabel, this::ackLabelNotDeclared)
.match(Acknowledgement.class, this::forwardAcknowledgementOrLiveCommandResponse)
.match(CommandResponse.class, CommandResponse::isLiveCommandResponse, liveCommandResponse ->
Expand Down Expand Up @@ -493,6 +495,16 @@ private static Receive addPreprocessors(final List<PartialFunction<Object, Objec
.orElse(receive);
}

private boolean isWeakAckForBuiltInAckLabel(final Acknowledgement acknowledgement) {
return acknowledgement.isWeak() &&
List.of(DittoAcknowledgementLabel.values()).contains(acknowledgement.getLabel());
}

private void dropWeakAckForBuiltInAckLabelAcknowledgement(final Acknowledgement ack) {
logger.withCorrelationId(ack)
.info("Dropping weak ack for built-in ack label <{}>", ack.getLabel());
}

private boolean hasUndeclaredAckLabel(final Acknowledgement acknowledgement) {
return !declaredAcks.contains(acknowledgement.getLabel());
}
Expand Down

0 comments on commit bc8e6a2

Please sign in to comment.