Skip to content

Commit

Permalink
Adjust logging/ add test
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Mar 7, 2022
1 parent fcdf92a commit a9b381e
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,16 @@ private <T extends Signal<?>, S extends T> CompletionStage<Contextual<WithDittoH
final DistributedPub<T> pub) {

// using pub/sub to publish the command to any interested parties (e.g. a Websocket):
log(signal).debug("Publish message to pub-sub: <{}>", signal);

if (enforcementConfig.shouldDispatchGlobally(signal)) {
return responseReceiverCache.insertResponseReceiverConflictFree(signal,
newSignal -> sender(),
(newSignal, receiver) -> publishSignal(newSignal, ackExtractor, pub));
(newSignal, receiver) -> {
log(newSignal).debug("Publish message to pub-sub: <{}>", newSignal);
return publishSignal(newSignal, ackExtractor, pub);
});
} else {
log(signal).debug("Publish message to pub-sub: <{}>", signal);
return CompletableFuture.completedStage(publishSignal(signal, ackExtractor, pub));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.events.Event;
Expand Down Expand Up @@ -119,7 +121,7 @@ public void rejectMessageCommandByPolicy() {
mockEntitiesActorInstance.setReply(TestSetup.POLICY_SUDO, sudoRetrievePolicyResponse);

final ActorRef underTest = newEnforcerActor(getRef());
underTest.tell(thingMessageCommand(), getRef());
underTest.tell(thingMessageCommand("abc"), getRef());
TestSetup.fishForMsgClass(this, MessageSendNotAllowedException.class);
}};
}
Expand Down Expand Up @@ -308,6 +310,63 @@ public void correlationIdSameAfterResponseSuccessful() {
}};
}

@Test
public void correlationIdDifferentInCaseOfConflict() {
final PolicyId policyId = PolicyId.of("policy:id");
final JsonObject thingWithPolicy = newThingWithPolicyId(policyId);
final JsonObject policy = PoliciesModelFactory.newPolicyBuilder(policyId)
.setRevision(1L)
.forLabel("authorize-self")
.setSubject(GOOGLE, SUBJECT_ID)
.setGrantedPermissions(PoliciesResourceType.messageResource(JsonPointer.empty()),
Permissions.newInstance(Permission.READ, Permission.WRITE))
.build()
.toJson(FieldType.all());
final SudoRetrieveThingResponse sudoRetrieveThingResponse =
SudoRetrieveThingResponse.of(thingWithPolicy, DittoHeaders.empty());
final SudoRetrievePolicyResponse sudoRetrievePolicyResponse =
SudoRetrievePolicyResponse.of(policyId, policy, DittoHeaders.empty());

new TestKit(system) {{
mockEntitiesActorInstance.setReply(TestSetup.THING_SUDO, sudoRetrieveThingResponse);
mockEntitiesActorInstance.setReply(TestSetup.POLICY_SUDO, sudoRetrievePolicyResponse);

final ActorRef underTest = newEnforcerActor(getRef());

final MessageCommand<?, ?> message = thingMessageCommand("abc");

underTest.tell(message, getRef());
final DistributedPubSubMediator.Publish firstPublishRead =
pubSubMediatorProbe.expectMsgClass(DistributedPubSubMediator.Publish.class);
assertThat(firstPublishRead.topic()).isEqualTo(StreamingType.MESSAGES.getDistributedPubSubTopic());
assertThat(firstPublishRead.msg()).isInstanceOf(MessageCommand.class);
assertThat((CharSequence) ((WithEntityId) firstPublishRead.msg()).getEntityId()).isEqualTo(
message.getEntityId());
assertThat((CharSequence) ((WithDittoHeaders) firstPublishRead.msg()).getDittoHeaders()
.getCorrelationId()
.orElseThrow()).isEqualTo(
message.getDittoHeaders().getCorrelationId().orElseThrow());

underTest.tell(message, getRef());
final DistributedPubSubMediator.Publish secondPublishRead =
pubSubMediatorProbe.expectMsgClass(DistributedPubSubMediator.Publish.class);
assertThat(secondPublishRead.topic()).isEqualTo(StreamingType.MESSAGES.getDistributedPubSubTopic());
assertThat(secondPublishRead.msg()).isInstanceOf(MessageCommand.class);
assertThat((CharSequence) ((WithEntityId) secondPublishRead.msg()).getEntityId()).isEqualTo(
message.getEntityId());
// Assure second command has suffixed correlation-id, because of conflict with first command.
assertThat((CharSequence) ((WithDittoHeaders) secondPublishRead.msg()).getDittoHeaders()
.getCorrelationId()
.orElseThrow()).startsWith(
message.getDittoHeaders().getCorrelationId().orElseThrow());
assertThat((CharSequence) ((WithDittoHeaders) secondPublishRead.msg()).getDittoHeaders()
.getCorrelationId()
.orElseThrow()).isNotEqualTo(
message.getDittoHeaders().getCorrelationId().orElseThrow());

}};
}

@Test
public void acceptMessageCommandByPolicy() {
final PolicyId policyId = PolicyId.of("policy:id");
Expand All @@ -331,7 +390,7 @@ public void acceptMessageCommandByPolicy() {

final ActorRef underTest = newEnforcerActor(getRef());

final MessageCommand<?, ?> msgCommand = thingMessageCommand();
final MessageCommand<?, ?> msgCommand = thingMessageCommand("abc");
mockEntitiesActorInstance.setReply(msgCommand);
underTest.tell(msgCommand, getRef());
final DistributedPubSubMediator.Publish publish =
Expand Down Expand Up @@ -474,10 +533,11 @@ private static ThingCommand<?> getModifyFeatureCommand(final DittoHeaders header
return ModifyFeature.of(TestSetup.THING_ID, TestSetup.FEATURE, headers);
}

private static MessageCommand<?, ?> thingMessageCommand() {
private static MessageCommand<?, ?> thingMessageCommand(final String correlationId) {
final Message<Object> message = Message.newBuilder(
MessageBuilder.newHeadersBuilder(MessageDirection.TO, TestSetup.THING_ID, "my-subject")
.contentType("text/plain")
.correlationId(correlationId)
.build())
.payload("Hello you!")
.build();
Expand Down

0 comments on commit a9b381e

Please sign in to comment.