Skip to content

Commit

Permalink
fixed unit tests affected by switch to acknowledgement aggregator add…
Browse files Browse the repository at this point in the history
…ress pass through

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jun 22, 2022
1 parent 6de622d commit 28ff503
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ public void testAutoAck() throws Exception {
final TestProbe probe = new TestProbe(actorSystem);
setupMocks(probe);
final OutboundSignal.MultiMapped multiMapped = OutboundSignalFactory.newMultiMappedOutboundSignal(
List.of(getMockOutboundSignalWithAutoAck("please-verify")),
List.of(getMockOutboundSignalWithAutoAck("please-verify",
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat())),
getRef()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public void testTargetIssuedAcknowledgement() {
final AcknowledgementLabel acknowledgementLabel = getTestAck(connectionId);
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder()
.acknowledgementRequest(AcknowledgementRequest.of(acknowledgementLabel))
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat())
.readGrantedSubjects(Collections.singleton(TestConstants.Authorization.SUBJECT))
.timeout("2s")
.randomCorrelationId()
Expand Down Expand Up @@ -229,6 +230,7 @@ public void sendAckWithNotDeclaredLabel() {
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder()
.acknowledgementRequest(AcknowledgementRequest.of(acknowledgementLabel))
.readGrantedSubjects(Collections.singleton(TestConstants.Authorization.SUBJECT))
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat())
.timeout("2s")
.randomCorrelationId()
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType;
import org.eclipse.ditto.base.model.entity.metadata.Metadata;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.connectivity.api.OutboundSignal;
Expand Down Expand Up @@ -105,8 +106,8 @@ public void sendWeakAckForAllSourcesAndTargetsWhenDroppedByAllTargets() {
final OutboundSignal outboundSignal = outboundTwinEvent(
Attributes.newBuilder().build(),
List.of("source1", "target1", "target2", "unknown"),
List.of(target2()) // authorized target2 will drop the signal after filtering
);
List.of(target2()), // authorized target2 will drop the signal after filtering
getRef());
underTest.tell(outboundSignal, getRef());
proxyActorProbe.expectMsgClass(RetrieveThing.class);
proxyActorProbe.reply(retrieveThingResponse(Attributes.newBuilder().build()));
Expand All @@ -133,8 +134,8 @@ public void eventsWithFailedEnrichmentIssueFailedAcks() {
final OutboundSignal outboundSignal = outboundTwinEvent(
Attributes.newBuilder().set("target2", "wayne").build(),
List.of("source1", "target1", "target2", "unknown"),
List.of(target1(), target2())
);
List.of(target1(), target2()),
getRef());
underTest.tell(outboundSignal, getRef());
proxyActorProbe.expectMsgClass(RetrieveThing.class);

Expand All @@ -160,8 +161,8 @@ public void sendWeakAckWhenDroppedBySomeTarget() {
final OutboundSignal outboundSignal = outboundTwinEvent(
Attributes.newBuilder().build(),
List.of("source1", "target1", "target2", "unknown"),
List.of(target1(), target2())
);
List.of(target1(), target2()),
getRef());
underTest.tell(outboundSignal, getRef());
proxyActorProbe.expectMsgClass(RetrieveThing.class);
proxyActorProbe.reply(retrieveThingResponse(Attributes.newBuilder().build()));
Expand Down Expand Up @@ -190,8 +191,8 @@ public void sendWeakAckWhenDroppedByMapper() {
final OutboundSignal outboundSignal = outboundTwinEvent(
Attributes.newBuilder().build(),
List.of("source1", "target1", "target3"),
List.of(target1(), target3())
);
List.of(target1(), target3()),
getRef());
underTest.tell(outboundSignal, getRef());

// THEN: sender receives weak acknowledgement only for the target that dropped it
Expand All @@ -218,7 +219,8 @@ public void doNotSendWeakAckForLiveResponse() {
final OutboundSignal outboundSignal = outboundLiveEvent(
Attributes.newBuilder().build(),
List.of("source1", "target1", "target3", "live-response"),
List.of(target1(), target3(), target4())
List.of(target1(), target3(), target4()),
getRef()
);
underTest.tell(outboundSignal, getRef());

Expand All @@ -245,8 +247,8 @@ public void expectNoTargetIssuedAckRequestInPublishedSignals() {
final OutboundSignal outboundSignal = outboundTwinEvent(
Attributes.newBuilder().build(),
List.of("source1", "target1", "target2"),
List.of(target1())
);
List.of(target1()),
getRef());
underTest.tell(outboundSignal, getRef());

// THEN: only the source-declared ack is present in the published signal
Expand All @@ -266,22 +268,23 @@ private List<OutboundMappingProcessor> getProcessors() {
}

private static OutboundSignal outboundTwinEvent(final Attributes attributes, final Collection<String> requestedAcks,
final List<Target> targets) {
final List<Target> targets, final ActorRef testRef) {
final Thing thing = Thing.newBuilder().setId(thingId())
.setAttributes(attributes)
.build();
final ThingModified thingModified = ThingModified.of(thing, 2L, Instant.EPOCH, DittoHeaders.newBuilder()
.acknowledgementRequests(requestedAcks.stream()
.map(AcknowledgementRequest::parseAcknowledgementRequest)
.toList())
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), testRef.path().toSerializationFormat())
.build(),
Metadata.newMetadata(JsonObject.empty()));
return OutboundSignalFactory.newOutboundSignal(thingModified, targets);
}

private static OutboundSignal outboundLiveEvent(final Attributes attributes, final Collection<String> requestedAcks,
final List<Target> targets) {
final OutboundSignal outboundTwinEvent = outboundTwinEvent(attributes, requestedAcks, targets);
final List<Target> targets, final ActorRef testRef) {
final OutboundSignal outboundTwinEvent = outboundTwinEvent(attributes, requestedAcks, targets, testRef);
return OutboundSignalFactory.newOutboundSignal(
outboundTwinEvent.getSource()
.setDittoHeaders(outboundTwinEvent.getSource().getDittoHeaders().toBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.WithOptionalEntity;
Expand Down Expand Up @@ -87,7 +88,7 @@
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;

public class AmqpPublisherActorTest extends AbstractPublisherActorTest {
public final class AmqpPublisherActorTest extends AbstractPublisherActorTest {

private static final Logger LOGGER = LoggerFactory.getLogger(AmqpPublisherActorTest.class);
private static final String ANOTHER_ADDRESS = "anotherAddress";
Expand Down Expand Up @@ -156,6 +157,7 @@ public void testMsgPublishedOntoFullQueueShallBeDropped() throws Exception {
.build();
final DittoHeaders withAckRequest = dittoHeaders.toBuilder()
.acknowledgementRequest(AcknowledgementRequest.of(AcknowledgementLabel.of(ack)))
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat())
.build();

final Props props = getPublisherActorProps();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.common.DittoConstants;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
Expand Down Expand Up @@ -322,6 +323,7 @@ public void testMessageCommandHttpPushWithNonLiveResponseIssuedAcknowledgement()
.putHeader("device_id", "ditto:thing")
.acknowledgementRequest(AcknowledgementRequest.of(DittoAcknowledgementLabel.LIVE_RESPONSE),
AcknowledgementRequest.of(autoAckLabel))
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat())
.build();
final Signal<?> source = SendThingMessage.of(TestConstants.Things.THING_ID, message, dittoHeaders);
final var outboundSignal =
Expand Down Expand Up @@ -528,6 +530,7 @@ public void sendingLiveResponseWithWrongCorrelationIdDoesNotWork() {
.correlationId(TestConstants.CORRELATION_ID)
.putHeader("device_id", "ditto:thing")
.acknowledgementRequest(AcknowledgementRequest.of(DittoAcknowledgementLabel.LIVE_RESPONSE))
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat())
.build();
final Signal<?> source = SendThingMessage.of(TestConstants.Things.THING_ID, message, dittoHeaders);
final var outboundSignal =
Expand Down Expand Up @@ -605,6 +608,7 @@ public void sendingLiveResponseToDifferentThingIdDoesNotWork() {
.correlationId(TestConstants.CORRELATION_ID)
.putHeader("device_id", "ditto:thing")
.acknowledgementRequest(AcknowledgementRequest.of(DittoAcknowledgementLabel.LIVE_RESPONSE))
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat())
.build();
final Signal<?> source = SendThingMessage.of(TestConstants.Things.THING_ID, message, dittoHeaders);
final var outboundSignal =
Expand Down Expand Up @@ -685,6 +689,7 @@ public void sendingWrongResponseTypeDoesNotWork() {
.correlationId(TestConstants.CORRELATION_ID)
.putHeader("device_id", "ditto:thing")
.acknowledgementRequest(AcknowledgementRequest.of(DittoAcknowledgementLabel.LIVE_RESPONSE))
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat())
.build();
final Signal<?> source = SendThingMessage.of(TestConstants.Things.THING_ID, message, dittoHeaders);
final var outboundSignal =
Expand Down Expand Up @@ -1016,8 +1021,10 @@ private T getValue() {

private OutboundSignal.MultiMapped newMultiMappedWithContentType(final Target target, final ActorRef sender) {
return OutboundSignalFactory.newMultiMappedOutboundSignal(
List.of(getMockOutboundSignal(target, "requested-acks",
JsonArray.of(JsonValue.of("please-verify")).toString())), sender);
List.of(getMockOutboundSignal(target,
"requested-acks", JsonArray.of(JsonValue.of("please-verify")).toString(),
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), sender.path().toSerializationFormat()))
, sender);
}

private HttpPushFactory mockHttpPushFactory(final String contentType, final HttpStatus httpStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ public void testMessageDroppedOnQueueOverflow() {
IntStream.range(0, kafkaConfig.getQueueSize() * 2)
.forEach(i -> {
final OutboundSignal.Mapped signal = getMockOutboundSignalWithAutoAck("aight",
DittoHeaderDefinition.CORRELATION_ID.getKey(), "msg" + i);
DittoHeaderDefinition.CORRELATION_ID.getKey(), "msg" + i,
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat());
final OutboundSignal.MultiMapped multiMapped =
OutboundSignalFactory.newMultiMappedOutboundSignal(List.of(
signal), getRef());
Expand Down Expand Up @@ -227,7 +228,8 @@ public void testAllQueuedMessagesAreFinallyPublished() {
mockSendProducerFactory = MockSendProducerFactory.getSlowStartInstance(TARGET_TOPIC, published);

final OutboundSignal.MultiMapped multiMapped =
OutboundSignalFactory.newMultiMappedOutboundSignal(List.of(getMockOutboundSignalWithAutoAck("ack")),
OutboundSignalFactory.newMultiMappedOutboundSignal(List.of(getMockOutboundSignalWithAutoAck("ack",
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat())),
getRef());

final Props props = getPublisherActorProps();
Expand Down Expand Up @@ -261,6 +263,7 @@ public void verifyAcknowledgementsWithDebugEnabled() {
.correlationId(TestConstants.CORRELATION_ID)
.putHeader("device_id", "ditto:thing")
.acknowledgementRequest(AcknowledgementRequest.of(acknowledgementLabel))
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), getRef().path().toSerializationFormat())
.build();
final Target target = ConnectivityModelFactory.newTargetBuilder()
.address(getOutboundAddress())
Expand Down Expand Up @@ -361,7 +364,8 @@ private void testSendFailure(final RuntimeException exception, final BiConsumer<

final TestProbe senderProbe = TestProbe.apply("sender", actorSystem);
final OutboundSignal.MultiMapped multiMapped = OutboundSignalFactory.newMultiMappedOutboundSignal(
List.of(getMockOutboundSignalWithAutoAck(exception.getClass().getSimpleName())),
List.of(getMockOutboundSignalWithAutoAck(exception.getClass().getSimpleName(),
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), senderProbe.ref().path().toSerializationFormat())),
senderProbe.ref()
);
final ActorRef publisherActor = childActorOf(getPublisherActorProps());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.function.Supplier;

import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgements;
import org.eclipse.ditto.connectivity.api.OutboundSignal;
Expand Down Expand Up @@ -67,12 +68,24 @@ public void testMultipleAcknowledgements() throws Exception {
final int signalCount = 6;
final OutboundSignal.MultiMapped multiMapped =
OutboundSignalFactory.newMultiMappedOutboundSignal(List.of(
getMockOutboundSignalWithAutoAck("rabbit1"),
getMockOutboundSignalWithAutoAck("rabbit2"),
getMockOutboundSignalWithAutoAck("rabbit3"),
getMockOutboundSignalWithAutoAck("rabbit4"),
getMockOutboundSignalWithAutoAck("rabbit5"),
getMockOutboundSignalWithAutoAck("rabbit6")
getMockOutboundSignalWithAutoAck("rabbit1",
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(),
getRef().path().toSerializationFormat()),
getMockOutboundSignalWithAutoAck("rabbit2",
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(),
getRef().path().toSerializationFormat()),
getMockOutboundSignalWithAutoAck("rabbit3",
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(),
getRef().path().toSerializationFormat()),
getMockOutboundSignalWithAutoAck("rabbit4",
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(),
getRef().path().toSerializationFormat()),
getMockOutboundSignalWithAutoAck("rabbit5",
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(),
getRef().path().toSerializationFormat()),
getMockOutboundSignalWithAutoAck("rabbit6",
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(),
getRef().path().toSerializationFormat())
), getRef());

final Props props = getPublisherActorProps();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,8 @@ private BiConsumer<Object, Throwable> handleSignalEnforcementResponse(final With
.info("Received DittoRuntimeException during enforcement or " +
"forwarding to target actor, telling sender: {}", dre);
sender.tell(dre, getSelf());
} else if (response instanceof Status.Success success) {
log.debug("Ignoring Status.Success message as expected 'to be ignored' outcome: <{}>", success);
} else if (null != response) {
sender.tell(response, getSelf());
} else {
Expand Down

0 comments on commit 28ff503

Please sign in to comment.