Skip to content

Commit

Permalink
Issue #561: added CachingSignalEnrichmentFacade
Browse files Browse the repository at this point in the history
* created new module "ditto-services-models-signalenrichment" in order to prevent circular dependencies
* enhanced EntityIdWithResourceType with CacheLookupContext containing additional context for doing cache lookups
* adjusted CacheLoaders in order to be able to select specific fields
* added new ThingEnrichmentCacheLoader
* did some renaming in order to have a consistent class naming scheme for SignalEnrichmentFacades and SignalEnrichmentProviders

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch-si.com>
  • Loading branch information
thjaeckle committed Jan 8, 2020
1 parent 41ed134 commit 6631a85
Show file tree
Hide file tree
Showing 48 changed files with 1,064 additions and 185 deletions.
5 changes: 5 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,11 @@
<artifactId>ditto-services-models-thingsearch</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-models-signalenrichment</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
Expand Down
4 changes: 4 additions & 0 deletions services/connectivity/mapping/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-models-connectivity</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-models-signalenrichment</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-base</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@

import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.services.base.config.SignalEnrichmentConfig;
import org.eclipse.ditto.services.models.things.DefaultSignalEnrichmentFacadeByRoundTripConfig;
import org.eclipse.ditto.services.models.things.SignalEnrichmentFacade;
import org.eclipse.ditto.services.models.things.SignalEnrichmentFacadeByRoundTrip;
import org.eclipse.ditto.services.models.things.SignalEnrichmentFacadeByRoundTripConfig;
import org.eclipse.ditto.services.models.signalenrichment.ByRoundTripSignalEnrichmentFacade;
import org.eclipse.ditto.services.models.signalenrichment.DefaultSignalEnrichmentFacadeByRoundTripConfig;
import org.eclipse.ditto.services.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.services.models.signalenrichment.SignalEnrichmentFacadeByRoundTripConfig;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;

/**
* Provider for Connectivity-service of thing-enriching facades that make a round-trip for each query.
*/
public final class ConnectivityByRoundTripProvider implements ConnectivitySignalEnrichmentProvider {
public final class ConnectivityByRoundTripSignalEnrichmentProvider implements ConnectivitySignalEnrichmentProvider {

private final ActorRef commandHandler;
private final SignalEnrichmentFacadeByRoundTripConfig signalEnrichmentFacadeByRoundTripConfig;
Expand All @@ -38,7 +38,7 @@ public final class ConnectivityByRoundTripProvider implements ConnectivitySignal
* @param signalEnrichmentConfig Configuration for this provider.
*/
@SuppressWarnings("unused")
public ConnectivityByRoundTripProvider(final ActorSystem actorSystem, final ActorRef commandHandler,
public ConnectivityByRoundTripSignalEnrichmentProvider(final ActorSystem actorSystem, final ActorRef commandHandler,
final SignalEnrichmentConfig signalEnrichmentConfig) {
this.commandHandler = commandHandler;
signalEnrichmentFacadeByRoundTripConfig =
Expand All @@ -47,7 +47,7 @@ public ConnectivityByRoundTripProvider(final ActorSystem actorSystem, final Acto

@Override
public SignalEnrichmentFacade createFacade(final ConnectionId connectionId) {
return SignalEnrichmentFacadeByRoundTrip.of(commandHandler,
return ByRoundTripSignalEnrichmentFacade.of(commandHandler,
signalEnrichmentFacadeByRoundTripConfig.getAskTimeout());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.connectivity.mapping;

import java.util.concurrent.Executor;

import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.services.base.config.SignalEnrichmentConfig;
import org.eclipse.ditto.services.models.signalenrichment.CachingSignalEnrichmentFacade;
import org.eclipse.ditto.services.models.signalenrichment.CachingSignalEnrichmentFacadeConfig;
import org.eclipse.ditto.services.models.signalenrichment.DefaultCachingSignalEnrichmentFacadeConfig;
import org.eclipse.ditto.services.models.signalenrichment.SignalEnrichmentFacade;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;

/**
* Provider for Connectivity-service of thing-enriching facades that uses an async Caffeine cache in order to load
* extra data to enrich.
*/
public final class ConnectivityCachingSignalEnrichmentProvider implements ConnectivitySignalEnrichmentProvider {

private final ActorRef commandHandler;
private final CachingSignalEnrichmentFacadeConfig cachingSignalEnrichmentFacadeConfig;
private final Executor cacheLoaderExecutor;

/**
* Instantiate this provider. Called by reflection.
*
* @param actorSystem The actor system for which this provider is instantiated.
* @param commandHandler The recipient of retrieve-thing commands - for this class this is the conciergeForwarder.
* @param signalEnrichmentConfig Configuration for this provider.
*/
@SuppressWarnings("unused")
public ConnectivityCachingSignalEnrichmentProvider(final ActorSystem actorSystem, final ActorRef commandHandler,
final SignalEnrichmentConfig signalEnrichmentConfig) {
this.commandHandler = commandHandler;
cachingSignalEnrichmentFacadeConfig =
DefaultCachingSignalEnrichmentFacadeConfig.of(signalEnrichmentConfig.getProviderConfig());
cacheLoaderExecutor = actorSystem.dispatchers().lookup("signal-enrichment-cache-dispatcher");
}

@Override
public SignalEnrichmentFacade createFacade(final ConnectionId connectionId) {
return CachingSignalEnrichmentFacade.of(commandHandler,
cachingSignalEnrichmentFacadeConfig.getAskTimeout(),
cachingSignalEnrichmentFacadeConfig.getCacheConfig(),
cacheLoaderExecutor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.services.base.config.SignalEnrichmentConfig;
import org.eclipse.ditto.services.models.things.SignalEnrichmentFacade;
import org.eclipse.ditto.services.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.services.utils.akka.AkkaClassLoader;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;

/**
* Provider of {@link org.eclipse.ditto.services.models.things.SignalEnrichmentFacade} to be loaded by reflection.
* Provider of {@link SignalEnrichmentFacade} to be loaded by reflection.
* Implementations MUST have a public constructor taking the following parameters as arguments:
* <ul>
* <li>ActorSystem actorSystem: actor system in which this provider is loaded,</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
import akka.testkit.javadsl.TestKit;

/**
* Tests {@link ConnectivityByRoundTripProvider}.
* Tests {@link ConnectivityByRoundTripSignalEnrichmentProvider}.
*/
public final class ConnectivityByRoundTripProviderTest {
public final class ConnectivityByRoundTripSignalEnrichmentProviderTest {

private final SignalEnrichmentConfig config = DefaultSignalEnrichmentConfig.of(ConfigFactory.empty()
.withValue("signal-enrichment.provider",
ConfigValueFactory.fromAnyRef(ConnectivityByRoundTripProvider.class.getCanonicalName()))
ConfigValueFactory.fromAnyRef(ConnectivityByRoundTripSignalEnrichmentProvider.class.getCanonicalName()))
.withValue("signal-enrichment.provider-config.ask-timeout",
ConfigValueFactory.fromAnyRef(Duration.ofDays(1L))));

Expand All @@ -61,7 +61,7 @@ public void loadProvider() {
new TestKit(createActorSystem()) {{
final ConnectivitySignalEnrichmentProvider
underTest = ConnectivitySignalEnrichmentProvider.load(actorSystem, getRef(), config);
assertThat(underTest).isInstanceOf(ConnectivityByRoundTripProvider.class);
assertThat(underTest).isInstanceOf(ConnectivityByRoundTripSignalEnrichmentProvider.class);
}};
}

Expand Down
4 changes: 4 additions & 0 deletions services/connectivity/messaging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-models-connectivity</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-models-signalenrichment</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-utils-protocol</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,7 @@ private ActorRef startMessageMappingProcessorActor() {
* This however will also limit throughput as the used hashing key is often connection source address based
* and does not yet "know" of the Thing ID.
*/
// TODO TJ does this even resize dynamically? there is no resizer configured
return getContext().actorOf(new ConsistentHashingPool(connection.getProcessorPoolSize())
.withDispatcher("message-mapping-processor-dispatcher")
.props(props), MessageMappingProcessorActor.ACTOR_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
import org.eclipse.ditto.services.models.connectivity.MappedInboundExternalMessage;
import org.eclipse.ditto.services.models.connectivity.OutboundSignal;
import org.eclipse.ditto.services.models.connectivity.OutboundSignalFactory;
import org.eclipse.ditto.services.models.things.SignalEnrichmentFacade;
import org.eclipse.ditto.services.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.akka.controlflow.AbstractGraphActor;
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
Expand Down Expand Up @@ -283,7 +283,7 @@ private CompletionStage<Collection<OutboundSignalWithId>> enrichAndFilterSignal(
return CompletableFuture.completedFuture(Collections.singletonList(outboundSignal));
}
final JsonFieldSelector extraFields = filteredTopic.getExtraFields().get();
final Target target = outboundSignal.getTargets().get(0);
final Target target = outboundSignal.getTargets().get(0); // TODO TJ is this sufficient to get target 0?

final ThingId thingId = ThingId.of(outboundSignal.getEntityId());
final DittoHeaders headers = outboundSignal.getSource()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import org.eclipse.ditto.services.models.connectivity.ExternalMessageFactory;
import org.eclipse.ditto.services.models.connectivity.OutboundSignal;
import org.eclipse.ditto.services.models.connectivity.OutboundSignalFactory;
import org.eclipse.ditto.services.models.things.SignalEnrichmentFacadeByRoundTrip;
import org.eclipse.ditto.services.models.signalenrichment.ByRoundTripSignalEnrichmentFacade;
import org.eclipse.ditto.services.utils.protocol.ProtocolAdapterProvider;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
Expand Down Expand Up @@ -216,7 +216,7 @@ public void testSignalEnrichment() {
// THEN: MessageMappingProcessor loads a signal-enrichment-facade lazily
commandHandlerProbe.expectMsg(MessageMappingProcessorActor.Request.GET_SIGNAL_ENRICHMENT_PROVIDER);
commandHandlerProbe.reply((ConnectivitySignalEnrichmentProvider)
connectionId -> SignalEnrichmentFacadeByRoundTrip.of(getRef(), Duration.ofSeconds(10L)));
connectionId -> ByRoundTripSignalEnrichmentFacade.of(getRef(), Duration.ofSeconds(10L)));

// THEN: Receive a RetrieveThing command from the facade.
final RetrieveThing retrieveThing = expectMsgClass(RetrieveThing.class);
Expand Down
31 changes: 24 additions & 7 deletions services/connectivity/starter/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -189,21 +189,26 @@ ditto {
signal-enrichment {
// Beware: Despite similarities with gateway signal-enrichment providers,
// this class is different and not compatible with them.
provider = "org.eclipse.ditto.services.connectivity.mapping.ConnectivityByRoundTripProvider"
provider = "org.eclipse.ditto.services.connectivity.mapping.ConnectivityCachingSignalEnrichmentProvider"
provider = ${?CONNECTION_ENRICHMENT_PROVIDER}

provider-config {
# timeout for all facades
ask-timeout = 10s
ask-timeout = ${?CONNECTION_ENRICHMENT_ASK_TIMEOUT}

# how many things to cache per stream if a caching implementation is chosen
maximum-size = 25
maximum-size = ${?CONNECTION_ENRICHMENT_CACHE_MAXIMUM_SIZE}
cache {
# how many things to cache per stream if a caching implementation is chosen
maximum-size = 100
maximum-size = ${?CONNECTION_ENRICHMENT_CACHE_MAXIMUM_SIZE}

# maximum duration of inconsistency after missing a cache invalidation
expire-after-write = 60s
expire-after-write = ${?CONNECTION_ENRICHMENT_CACHE_EXPIRE_AFTER_WRITE}
# maximum duration of inconsistency after missing a cache invalidation
expire-after-write = 1h
expire-after-write = ${?CONNECTION_ENRICHMENT_CACHE_EXPIRE_AFTER_WRITE}

expire-after-access = 15m
expire-after-access = ${?CONNECTION_ENRICHMENT_CACHE_EXPIRE_AFTER_ACCESS}
}
}
}

Expand Down Expand Up @@ -373,4 +378,16 @@ jms-connection-handling-dispatcher {
executor = "thread-pool-executor"
}

signal-enrichment-cache-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
keep-alive-time = 60s
fixed-pool-size = off
max-pool-size-max = 256
max-pool-size-max = ${?CACHE_DISPATCHER_POOL_SIZE_MAX}
max-pool-size-max = ${?SIGNAL_ENRICHMENT_CACHE_DISPATCHER_POOL_SIZE_MAX}
}
}

include "connectivity-extension"
4 changes: 4 additions & 0 deletions services/gateway/endpoints/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@
<artifactId>ditto-signals-events-policies</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-models-signalenrichment</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-services-models-thingsearch</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.eclipse.ditto.services.gateway.streaming.actors.EventAndResponsePublisher;
import org.eclipse.ditto.services.gateway.streaming.actors.SessionedJsonifiable;
import org.eclipse.ditto.services.models.concierge.streaming.StreamingType;
import org.eclipse.ditto.services.models.things.SignalEnrichmentFacade;
import org.eclipse.ditto.services.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.signals.events.things.ThingEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
import org.eclipse.ditto.services.gateway.streaming.actors.SessionedJsonifiable;
import org.eclipse.ditto.services.gateway.streaming.actors.StreamingActor;
import org.eclipse.ditto.services.models.concierge.streaming.StreamingType;
import org.eclipse.ditto.services.models.things.SignalEnrichmentFacade;
import org.eclipse.ditto.services.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.services.utils.akka.controlflow.Filter;
import org.eclipse.ditto.services.utils.akka.controlflow.LimitRateByRejection;
import org.eclipse.ditto.services.utils.akka.logging.AutoCloseableSlf4jLogger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
package org.eclipse.ditto.services.gateway.endpoints.utils;

import org.eclipse.ditto.services.base.config.SignalEnrichmentConfig;
import org.eclipse.ditto.services.models.things.DefaultSignalEnrichmentFacadeByRoundTripConfig;
import org.eclipse.ditto.services.models.things.SignalEnrichmentFacade;
import org.eclipse.ditto.services.models.things.SignalEnrichmentFacadeByRoundTrip;
import org.eclipse.ditto.services.models.things.SignalEnrichmentFacadeByRoundTripConfig;
import org.eclipse.ditto.services.models.signalenrichment.ByRoundTripSignalEnrichmentFacade;
import org.eclipse.ditto.services.models.signalenrichment.DefaultSignalEnrichmentFacadeByRoundTripConfig;
import org.eclipse.ditto.services.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.services.models.signalenrichment.SignalEnrichmentFacadeByRoundTripConfig;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
Expand All @@ -25,7 +25,7 @@
/**
* Provider for gateway-service of thing-enriching facades that make a round-trip for each query.
*/
public final class GatewayByRoundTripProvider implements GatewaySignalEnrichmentProvider {
public final class GatewayByRoundTripSignalEnrichmentProvider implements GatewaySignalEnrichmentProvider {

private final ActorRef commandHandler;
private final SignalEnrichmentFacadeByRoundTripConfig signalEnrichmentFacadeByRoundTripConfig;
Expand All @@ -38,7 +38,7 @@ public final class GatewayByRoundTripProvider implements GatewaySignalEnrichment
* @param signalEnrichmentConfig Configuration for this provider.
*/
@SuppressWarnings("unused")
public GatewayByRoundTripProvider(final ActorSystem actorSystem, final ActorRef commandHandler,
public GatewayByRoundTripSignalEnrichmentProvider(final ActorSystem actorSystem, final ActorRef commandHandler,
final SignalEnrichmentConfig signalEnrichmentConfig) {
this.commandHandler = commandHandler;
signalEnrichmentFacadeByRoundTripConfig =
Expand All @@ -47,7 +47,7 @@ public GatewayByRoundTripProvider(final ActorSystem actorSystem, final ActorRef

@Override
public SignalEnrichmentFacade createFacade(final HttpRequest request) {
return SignalEnrichmentFacadeByRoundTrip.of(commandHandler,
return ByRoundTripSignalEnrichmentFacade.of(commandHandler,
signalEnrichmentFacadeByRoundTripConfig.getAskTimeout());
}
}
Loading

0 comments on commit 6631a85

Please sign in to comment.