Skip to content

Commit

Permalink
Add caching signal enrichment provider to configs
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Oct 25, 2021
1 parent 3772fef commit 486d663
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 20 deletions.
2 changes: 2 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,8 @@ ditto {
expire-after-create = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_CACHE_EXPIRE_AFTER_CREATE}
}
}
# Which caching signal enrichment facade is used by the signal-enrichment.provider
caching-signal-enrichment-facade.provider = ${?CONNECTIVITY_CACHING_SIGNAL_ENRICHMENT_PROVIDER}
}

persistence-ping {
Expand Down
4 changes: 3 additions & 1 deletion gateway/service/src/main/resources/gateway.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ditto {
actor-props-factory = "org.eclipse.ditto.gateway.service.endpoints.actors.DefaultHttpRequestActorPropsFactory"

# headers to read the proxy-protocol from for HTTPS enforcement
protocol-headers = [ "X-Forwarded-Proto", "x_forwarded_proto" ]
protocol-headers = ["X-Forwarded-Proto", "x_forwarded_proto"]

forcehttps = false
forcehttps = ${?FORCE_HTTPS}
Expand Down Expand Up @@ -81,6 +81,8 @@ ditto {
expire-after-create = 2m
expire-after-create = ${?GATEWAY_SIGNAL_ENRICHMENT_CACHE_EXPIRE_AFTER_CREATE}
}
# The caching signal enrichment provider to be used
caching-signal-enrichment-facade.provider = ${?GATEWAY_CACHING_SIGNAL_ENRICHMENT_PROVIDER}
}

acknowledgement {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,47 @@
* Instantiated once per cluster node so that it builds up a cache across all signal enrichments on a local cluster
* node.
*/
public final class DittoCachingSignalEnrichmentFacade implements CachingSignalEnrichmentFacade{
public final class DittoCachingSignalEnrichmentFacade implements CachingSignalEnrichmentFacade {

private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory
.getThreadSafeLogger(DittoCachingSignalEnrichmentFacade.class);
private static final String CACHE_NAME_SUFFIX = "_signal_enrichment_cache";

private final Cache<CacheKey, JsonObject> extraFieldsCache;

DittoCachingSignalEnrichmentFacade(
final SignalEnrichmentFacade cacheLoaderFacade,
private DittoCachingSignalEnrichmentFacade(final SignalEnrichmentFacade cacheLoaderFacade,
final CacheConfig cacheConfig,
final Executor cacheLoaderExecutor,
final String cacheNamePrefix) {

final var cacheLoader =
SignalEnrichmentCacheLoader.of(checkNotNull(cacheLoaderFacade, "cacheLoaderFacade"));
final var cacheName = checkNotNull(cacheNamePrefix, "cacheNamePrefix") + CACHE_NAME_SUFFIX;
final var cacheLoader = SignalEnrichmentCacheLoader.of(cacheLoaderFacade);
final var cacheName = cacheNamePrefix + CACHE_NAME_SUFFIX;

extraFieldsCache = CacheFactory.createCache(
cacheLoader,
checkNotNull(cacheConfig, "cacheConfig"),
cacheConfig,
cacheName,
checkNotNull(cacheLoaderExecutor, "cacheLoaderExecutor"));
cacheLoaderExecutor);
}

/**
* Returns a new {@code DittoCachingSignalEnrichmentFacade} instance.
*
* @param cacheLoaderFacade the facade whose argument-result-pairs we are caching.
* @param cacheConfig the cache configuration to use for the cache.
* @param cacheLoaderExecutor the executor to use in order to asynchronously load cache entries.
* @param cacheNamePrefix the prefix to use as cacheName of the cache.
* @throws NullPointerException if any argument is null.
*/
public static DittoCachingSignalEnrichmentFacade newInstance(final SignalEnrichmentFacade cacheLoaderFacade,
final CacheConfig cacheConfig,
final Executor cacheLoaderExecutor,
final String cacheNamePrefix) {

return new DittoCachingSignalEnrichmentFacade(checkNotNull(cacheLoaderFacade, "cacheLoaderFacade"),
checkNotNull(cacheConfig, "cacheConfig"),
checkNotNull(cacheLoaderExecutor, "cacheLoaderExecutor"),
checkNotNull(cacheNamePrefix, "cacheNamePrefix"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public CachingSignalEnrichmentFacade getSignalEnrichmentFacade(final SignalEnric
final Executor cacheLoaderExecutor,
final String cacheNamePrefix) {

return new DittoCachingSignalEnrichmentFacade(cacheLoaderFacade, cacheConfig, cacheLoaderExecutor,
return DittoCachingSignalEnrichmentFacade.newInstance(cacheLoaderFacade, cacheConfig, cacheLoaderExecutor,
cacheNamePrefix);
}

Expand Down
4 changes: 4 additions & 0 deletions thingsearch/service/src/main/resources/things-search.conf
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ ditto {
enabled = ${?INDEX_INITIALIZATION_ENABLED}
}

signal-enrichment {
caching-signal-enrichment-facade.provider = ${?THINGS_SEARCH_CACHING_SIGNAL_ENRICHMENT_PROVIDER}
}

updater {
max-idle-time = 15m
max-idle-time = ${?ACTIVITY_CHECK_INTERVAL}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
import org.eclipse.ditto.thingsearch.service.persistence.write.model.ThingWriteModel;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;

import com.typesafe.config.ConfigFactory;

Expand All @@ -62,11 +64,13 @@
import akka.stream.testkit.javadsl.TestSource;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

/**
* Unit tests for {@link EnforcementFlow}.
* Unit tests for {@link EnforcementFlow}. Contains fix method order to allow for longer setup during first test.
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public final class EnforcementFlowTest {

private static ActorSystem system;
Expand Down Expand Up @@ -104,7 +108,7 @@ public void updateThingAndPolicyRevisions() {

final StreamConfig streamConfig = DefaultStreamConfig.of(ConfigFactory.empty());
final EnforcementFlow underTest =
EnforcementFlow.of(streamConfig, thingsProbe.ref(), policiesProbe.ref(),
EnforcementFlow.of(system, streamConfig, thingsProbe.ref(), policiesProbe.ref(),
system.dispatchers().defaultGlobalDispatcher(), system.getScheduler());

materializeTestProbes(underTest.create(false, 1));
Expand Down Expand Up @@ -155,7 +159,7 @@ public void ignoreCacheWhenRequestedToUpdate() {

final StreamConfig streamConfig = DefaultStreamConfig.of(ConfigFactory.empty());
final EnforcementFlow underTest =
EnforcementFlow.of(streamConfig, thingsProbe.ref(), policiesProbe.ref(),
EnforcementFlow.of(system, streamConfig, thingsProbe.ref(), policiesProbe.ref(),
system.dispatchers().defaultGlobalDispatcher(), system.getScheduler());

materializeTestProbes(underTest.create(false, 1));
Expand Down Expand Up @@ -231,7 +235,7 @@ public void computeThingCacheValueFromThingEvents() {

final StreamConfig streamConfig = DefaultStreamConfig.of(ConfigFactory.empty());
final EnforcementFlow underTest =
EnforcementFlow.of(streamConfig, thingsProbe.ref(), policiesProbe.ref(),
EnforcementFlow.of(system, streamConfig, thingsProbe.ref(), policiesProbe.ref(),
system.dispatchers().defaultGlobalDispatcher(), system.getScheduler());

materializeTestProbes(underTest.create(false, 1));
Expand All @@ -244,7 +248,7 @@ public void computeThingCacheValueFromThingEvents() {
sourceProbe.sendComplete();

// WHEN: policy is retrieved with up-to-date revisions
policiesProbe.expectMsgClass(SudoRetrievePolicy.class);
policiesProbe.expectMsgClass(Duration.apply(30, TimeUnit.SECONDS), SudoRetrievePolicy.class);
final var policy = Policy.newBuilder(policyId).setRevision(1).build();
policiesProbe.reply(SudoRetrievePolicyResponse.of(policyId, policy, DittoHeaders.empty()));

Expand Down Expand Up @@ -292,7 +296,7 @@ public void forceRetrieveThing() {

final StreamConfig streamConfig = DefaultStreamConfig.of(ConfigFactory.empty());
final EnforcementFlow underTest =
EnforcementFlow.of(streamConfig, thingsProbe.ref(), policiesProbe.ref(),
EnforcementFlow.of(system, streamConfig, thingsProbe.ref(), policiesProbe.ref(),
system.dispatchers().defaultGlobalDispatcher(), system.getScheduler());

materializeTestProbes(underTest.create(false, 1));
Expand Down Expand Up @@ -337,7 +341,7 @@ public void eventSequenceNumberTooLow() {

final StreamConfig streamConfig = DefaultStreamConfig.of(ConfigFactory.empty());
final EnforcementFlow underTest =
EnforcementFlow.of(streamConfig, thingsProbe.ref(), policiesProbe.ref(),
EnforcementFlow.of(system, streamConfig, thingsProbe.ref(), policiesProbe.ref(),
system.dispatchers().defaultGlobalDispatcher(), system.getScheduler());

materializeTestProbes(underTest.create(false, 1));
Expand Down Expand Up @@ -381,7 +385,7 @@ public void eventMissed() {

final StreamConfig streamConfig = DefaultStreamConfig.of(ConfigFactory.empty());
final EnforcementFlow underTest =
EnforcementFlow.of(streamConfig, thingsProbe.ref(), policiesProbe.ref(),
EnforcementFlow.of(system, streamConfig, thingsProbe.ref(), policiesProbe.ref(),
system.dispatchers().defaultGlobalDispatcher(), system.getScheduler());

materializeTestProbes(underTest.create(false, 1));
Expand Down Expand Up @@ -424,7 +428,7 @@ public void noInitialCreatedOrDeletedEvent() {

final StreamConfig streamConfig = DefaultStreamConfig.of(ConfigFactory.empty());
final EnforcementFlow underTest =
EnforcementFlow.of(streamConfig, thingsProbe.ref(), policiesProbe.ref(),
EnforcementFlow.of(system, streamConfig, thingsProbe.ref(), policiesProbe.ref(),
system.dispatchers().defaultGlobalDispatcher(), system.getScheduler());

materializeTestProbes(underTest.create(false, 1));
Expand Down Expand Up @@ -491,7 +495,7 @@ public void onlyApplyRelevantEvents() {

final StreamConfig streamConfig = DefaultStreamConfig.of(ConfigFactory.empty());
final EnforcementFlow underTest =
EnforcementFlow.of(streamConfig, thingsProbe.ref(), policiesProbe.ref(),
EnforcementFlow.of(system, streamConfig, thingsProbe.ref(), policiesProbe.ref(),
system.dispatchers().defaultGlobalDispatcher(), system.getScheduler());

materializeTestProbes(underTest.create(false, 1));
Expand Down

0 comments on commit 486d663

Please sign in to comment.