Skip to content

Commit

Permalink
moved LiveResponseAndAcknowledgementForwarder to things-service-enfor…
Browse files Browse the repository at this point in the history
…cement

* removing the dependency of policies-service to ditto-things-api
* removed no longer needed ThingEnforcementIdCacheLoader
* fixed the depdendencies of ditto-internal-utils-cache-loaders, not requiring models any longer
* fixed some more TODOs

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed May 4, 2022
1 parent 409d7f0 commit 8229d0a
Show file tree
Hide file tree
Showing 21 changed files with 52 additions and 242 deletions.
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.auth.AuthorizationSubject;
import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
Expand Down Expand Up @@ -84,7 +85,9 @@ protected Object getCreateEntityCommand(final ConnectionId id) {
"amqp://user:pass@8.8.8.8:5671")
.sources(Collections.singletonList(source))
.build();
return CreateConnection.of(connection, DittoHeaders.empty());
return CreateConnection.of(connection, DittoHeaders.newBuilder()
.putHeader(DittoHeaderDefinition.DITTO_SUDO.getKey(), "true")
.build());
}

@Override
Expand All @@ -94,7 +97,9 @@ protected Class<?> getCreateEntityResponseClass() {

@Override
protected Object getRetrieveEntityCommand(final ConnectionId id) {
return RetrieveConnection.of(id, DittoHeaders.empty());
return RetrieveConnection.of(id, DittoHeaders.newBuilder()
.putHeader(DittoHeaderDefinition.DITTO_SUDO.getKey(), "true")
.build());
}

@Override
Expand Down
Expand Up @@ -66,7 +66,7 @@ private ConciergeForwarderActor(final ActorRef pubSubMediator, final ShardRegion
* Creates Akka configuration object Props for this actor.
*
* @param pubSubMediator the PubSub mediator Actor.
* @param shardRegions TODO TJ doc
* @param shardRegions shard regions to use in order to dispatch different entity Signals to.
* @return the Akka configuration Props object.
*/
public static Props props(final ActorRef pubSubMediator, final ShardRegions shardRegions) {
Expand All @@ -78,7 +78,7 @@ public static Props props(final ActorRef pubSubMediator, final ShardRegions shar
* Creates Akka configuration object Props for this actor.
*
* @param pubSubMediator the PubSub mediator Actor.
* @param shardRegions TODO TJ doc
* @param shardRegions shard regions to use in order to dispatch different entity Signals to.
* @param signalTransformer a function which transforms signals before forwarding them to the responsible
* {@code shardRegion}
* @return the Akka configuration Props object.
Expand Down
20 changes: 2 additions & 18 deletions internal/utils/cache-loaders/pom.xml
Expand Up @@ -27,28 +27,12 @@
<dependencies>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-cache</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-policies-model</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-things-model</artifactId>
<artifactId>ditto-internal-utils-akka</artifactId>
</dependency>

<!-- for Sudo commands: -->
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-things-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-policies-api</artifactId>
<artifactId>ditto-internal-utils-cache</artifactId>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
Expand Down

This file was deleted.

This file was deleted.

Expand Up @@ -28,11 +28,10 @@

import org.eclipse.ditto.base.model.exceptions.AskException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.signals.commands.CommandHeaderInvalidException;
import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
import org.eclipse.ditto.internal.utils.cacheloaders.config.DefaultAskWithRetryConfig;
import org.eclipse.ditto.internal.utils.cacheloaders.config.RetryStrategy;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.exceptions.ThingNotAccessibleException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -365,13 +364,13 @@ private static void ensureDoesNotRetryOnSentDittoRuntimeException(final Duration
final CompletionStage<Object> retryStage = buildRetryStage(getRef(), configMap);

expectMsg(ASK_MESSAGE);
reply(ThingNotAccessibleException.newBuilder(ThingId.generateRandom()).build());
reply(CommandHeaderInvalidException.newBuilder("just-for-testing").build());
expectNoMessage(DEFAULT_NO_MESSAGE_EXPECTATION_DURATION);

assertThat(retryStage)
.failsWithin(askTimeout.multipliedBy(3))
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(ThingNotAccessibleException.class);
.withCauseInstanceOf(CommandHeaderInvalidException.class);
}};
}

Expand Down
Expand Up @@ -21,10 +21,7 @@
import java.util.UUID;

import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.policies.model.PolicyConstants;
import org.eclipse.ditto.things.model.ThingConstants;
import org.junit.AfterClass;
import org.junit.Ignore;
import org.junit.Test;

import akka.actor.ActorRef;
Expand Down Expand Up @@ -65,9 +62,9 @@ public void testHashCodeAndEquals() {
@Test
public void buildAndLookup() {
// GIVEN
final EntityType thingType = ThingConstants.ENTITY_TYPE;
final EntityType thingType = EntityType.of("thing");
final ActorRef actorRef1 = createTestActorRef("ref1");
final EntityType policyType = PolicyConstants.ENTITY_TYPE;
final EntityType policyType = EntityType.of("policy");
final ActorRef actorRef2 = createTestActorRef("ref2");

// WHEN
Expand All @@ -90,7 +87,7 @@ public void buildWithNullResourceTypeThrowsException() {

@Test(expected = NullPointerException.class)
public void buildWithNullActorRefThrowsException() {
EntityRegionMap.newBuilder().put(ThingConstants.ENTITY_TYPE, null);
EntityRegionMap.newBuilder().put(EntityType.of("thing"), null);
}

private static ActorRef createTestActorRef(final String actorNamePrefix) {
Expand Down
1 change: 1 addition & 0 deletions internal/utils/pubsub/pom.xml
Expand Up @@ -30,6 +30,7 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-things-model</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-akka</artifactId>
Expand Down
Expand Up @@ -16,8 +16,7 @@
import java.util.Optional;

import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.base.model.signals.commands.Command;

/**
* Enumeration of the different types which can be streamed (e.g. to an open Websocket connection). Each
Expand All @@ -28,7 +27,7 @@ public enum StreamingType {
/**
* Streaming type of thing events.
*/
EVENTS(ThingEvent.TYPE_PREFIX),
EVENTS("things.events:"),

/**
* Streaming type of message commands. The pubsub topic must be equal to the type prefix of message commands.
Expand Down Expand Up @@ -109,7 +108,7 @@ public static Optional<StreamingType> fromSignal(final Signal<?> signal) {
result = LIVE_EVENTS;
} else if (signal.getType().startsWith(MESSAGES.getDistributedPubSubTopic())) {
result = MESSAGES;
} else if (signal instanceof ThingCommand) {
} else if (signal instanceof Command<?>) {
result = LIVE_COMMANDS;
} else {
result = null;
Expand Down

0 comments on commit 8229d0a

Please sign in to comment.