Skip to content

Commit

Permalink
Make ConnectivitySignalEnrichmentFacade configurable via extension me…
Browse files Browse the repository at this point in the history
…chanism

* Make instantiate DittoCachingSignalEnrichmentFacade explicitly instead
  of using the extension mechanism here. The extension is only relevant for
  search.
* Instead of having two classes for caching and none-caching a single class
  handles it which can be configured

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 12, 2022
1 parent dbbb7d3 commit 0d0bad5
Show file tree
Hide file tree
Showing 21 changed files with 275 additions and 790 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@
*/
package org.eclipse.ditto.connectivity.service.mapping;

import java.util.List;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.service.DittoExtensionIds;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.internal.models.signalenrichment.DefaultSignalEnrichmentConfig;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentConfig;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;

import com.typesafe.config.Config;

import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;

/**
* Provider of {@link SignalEnrichmentFacade} to be loaded by reflection.
Expand All @@ -49,43 +48,29 @@ public interface ConnectivitySignalEnrichmentProvider extends DittoExtensionPoin
* @param actorSystem The actor system in which to load the facade provider class.
* @return The configured facade provider.
*/
static ConnectivitySignalEnrichmentProvider get(final ActorSystem actorSystem) {
return ExtensionId.INSTANCE.get(actorSystem);
static ConnectivitySignalEnrichmentProvider get(final ActorSystem actorSystem, final Config config) {
checkNotNull(actorSystem, "actorSystem");
checkNotNull(config, "config");
final var extensionIdConfig = ExtensionId.computeConfig(config);
return DittoExtensionIds.get(actorSystem)
.computeIfAbsent(extensionIdConfig, ExtensionId::new)
.get(actorSystem);
}

/**
* ID of the actor system extension to provide signal enrichment for connectivity.
*/
final class ExtensionId extends DittoExtensionPoint.ExtensionId<ConnectivitySignalEnrichmentProvider> {

private static final String SIGNAL_ENRICHMENT_CONFIG_PATH = "ditto.connectivity";
private static final String CONFIG_PATH = "ditto.connectivity.signal-enrichment.provider";

private static final ExtensionId INSTANCE = new ExtensionId(ConnectivitySignalEnrichmentProvider.class);

/**
* Returns the {@code ExtensionId} for the implementation that should be loaded.
*
* @param parentClass the class of the extensions for which an implementation should be loaded.
*/
public ExtensionId(final Class<ConnectivitySignalEnrichmentProvider> parentClass) {
super(parentClass);
}

@Override
public ConnectivitySignalEnrichmentProvider createExtension(final ExtendedActorSystem system) {
final SignalEnrichmentConfig signalEnrichmentConfig =
DefaultSignalEnrichmentConfig.of(
system.settings().config().getConfig(SIGNAL_ENRICHMENT_CONFIG_PATH));
private static final String CONFIG_KEY = "signal-enrichment-provider";
private static final String CONFIG_PATH = "ditto.extensions." + CONFIG_KEY;

return AkkaClassLoader.instantiate(system, ConnectivitySignalEnrichmentProvider.class,
getImplementation(system),
List.of(ActorSystem.class, SignalEnrichmentConfig.class),
List.of(system, signalEnrichmentConfig));
private ExtensionId(final ExtensionIdConfig<ConnectivitySignalEnrichmentProvider> extensionIdConfig) {
super(extensionIdConfig);
}

protected String getImplementation(final ExtendedActorSystem actorSystem) {
return actorSystem.settings().config().getString(getConfigPath());
static ExtensionIdConfig<ConnectivitySignalEnrichmentProvider> computeConfig(final Config config) {
return ExtensionIdConfig.of(ConnectivitySignalEnrichmentProvider.class, config, CONFIG_KEY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.mapping;

import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.edge.service.dispatching.EdgeCommandForwarderActor;
import org.eclipse.ditto.internal.models.signalenrichment.ByRoundTripSignalEnrichmentFacade;
import org.eclipse.ditto.internal.models.signalenrichment.DefaultSignalEnrichmentProviderConfig;
import org.eclipse.ditto.internal.models.signalenrichment.DittoCachingSignalEnrichmentFacade;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;

import com.typesafe.config.Config;

import akka.actor.ActorSystem;

/**
* Provider for Connectivity-service of signal-enriching facades that uses an async Caffeine cache in order to load
* extra data to enrich.
*/
public final class DefaultConnectivitySignalEnrichmentProvider implements ConnectivitySignalEnrichmentProvider {

private static final String COMMAND_FORWARDER_ACTOR_PATH =
"/user/connectivityRoot/" + EdgeCommandForwarderActor.ACTOR_NAME;
private static final String CACHE_DISPATCHER = "signal-enrichment-cache-dispatcher";
private final SignalEnrichmentFacade facade;

/**
* Instantiate this provider. Called by reflection.
*
* @param actorSystem The actor system for which this provider is instantiated.
*/
@SuppressWarnings("unused")
public DefaultConnectivitySignalEnrichmentProvider(final ActorSystem actorSystem, final Config config) {
final var commandHandler = actorSystem.actorSelection(COMMAND_FORWARDER_ACTOR_PATH);
final var providerConfig = DefaultSignalEnrichmentProviderConfig.of(config);
final var delegate = ByRoundTripSignalEnrichmentFacade.of(commandHandler, providerConfig.getAskTimeout());
if (providerConfig.isCachingEnabled()) {
final var cacheLoaderExecutor = actorSystem.dispatchers().lookup(CACHE_DISPATCHER);
facade = DittoCachingSignalEnrichmentFacade.newInstance(
delegate,
providerConfig.getCacheConfig(),
cacheLoaderExecutor,
"connectivity");
} else {
facade = delegate;
}

}

@Override
public SignalEnrichmentFacade getFacade(final ConnectionId connectionId) {
return facade;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.akka.controlflow.AbstractGraphActor;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
Expand Down Expand Up @@ -103,6 +104,7 @@
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Status;
import akka.japi.Pair;
Expand Down Expand Up @@ -156,7 +158,7 @@ private OutboundMappingProcessorActor(final ActorRef clientActor,
super(OutboundSignal.class, logger ->
logger.withMdcEntry(ConnectivityMdcEntryKey.CONNECTION_ID, connection.getId())
);

final ActorSystem system = context().system();
this.clientActor = clientActor;
this.outboundMappingProcessors = checkNotEmpty(outboundMappingProcessors, "outboundMappingProcessors");
this.connection = connection;
Expand All @@ -169,8 +171,9 @@ private OutboundMappingProcessorActor(final ActorRef clientActor,
responseDispatchedMonitor = connectionMonitorRegistry.forResponseDispatched(this.connection);
responseDroppedMonitor = connectionMonitorRegistry.forResponseDropped(this.connection);
responseMappedMonitor = connectionMonitorRegistry.forResponseMapped(this.connection);
signalEnrichmentFacade =
ConnectivitySignalEnrichmentProvider.get(getContext().getSystem()).getFacade(this.connection.getId());
signalEnrichmentFacade = ConnectivitySignalEnrichmentProvider.get(system,
ScopedConfig.dittoExtension(system.settings().config()))
.getFacade(this.connection.getId());
this.processorPoolSize = determinePoolSize(processorPoolSize, mappingConfig.getMaxPoolSize());
toErrorResponseFunction = DittoRuntimeExceptionToErrorResponseFunction.of(limitsConfig.getHeadersMaxSize());
}
Expand Down Expand Up @@ -701,7 +704,7 @@ private <T> CompletionStage<Collection<OutboundSignal.MultiMapped>> toMultiMappe
.toList();
final Predicate<AcknowledgementLabel> willPublish =
ConnectionValidator.getTargetIssuedAcknowledgementLabels(connection.getId(),
targetsToPublishAt)
targetsToPublishAt)
.collect(Collectors.toSet())::contains;
final var signalsWithoutEnrichmentFailures =
filterFailedEnrichments(outboundSignals, willPublish, context, logger);
Expand Down
52 changes: 23 additions & 29 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,34 @@ ditto {
connection-priority-provider-factory = "org.eclipse.ditto.connectivity.service.messaging.persistence.UsageBasedPriorityProviderFactory"
# Factory for custom client actor props.
client-actor-props-factory = "org.eclipse.ditto.connectivity.service.messaging.DefaultClientActorPropsFactory"

signal-enrichment-provider {
// Beware: Despite similarities with gateway signal-enrichment providers,
// this class is different and not compatible with them.
extension-class = "org.eclipse.ditto.connectivity.service.mapping.DefaultConnectivitySignalEnrichmentProvider"
extension-config {
cache {
# enable/disable caching
enabled = true
enabled = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_CACHE_ENABLED}
# how many things to cache in total on a single cluster node
maximum-size = 20000
maximum-size = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_CACHE_MAXIMUM_SIZE}

# maximum duration of inconsistency after e.g. a policy update
expire-after-create = 2m
expire-after-create = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_CACHE_EXPIRE_AFTER_CREATE}
}
# timeout for all facades
ask-timeout = 10s
ask-timeout = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_ASK_TIMEOUT}
}
}
}

persistence.operations.delay-after-persistence-actor-shutdown = 5s
persistence.operations.delay-after-persistence-actor-shutdown = ${?DELAY_AFTER_PERSISTENCE_ACTOR_SHUTDOWN}

signal-enrichment {
# Which caching signal enrichment facade is used by the signal-enrichment.provider
caching-signal-enrichment-facade.provider = org.eclipse.ditto.internal.models.signalenrichment.DittoCachingSignalEnrichmentFacadeProvider
caching-signal-enrichment-facade.provider = ${?CONNECTIVITY_CACHING_SIGNAL_ENRICHMENT_PROVIDER}
}

connectivity {
user-indicated-errors-base = [
# Kafka
Expand Down Expand Up @@ -644,29 +661,6 @@ ditto {
}
}

signal-enrichment {
// Beware: Despite similarities with gateway signal-enrichment providers,
// this class is different and not compatible with them.
provider = "org.eclipse.ditto.connectivity.service.mapping.ConnectivityCachingSignalEnrichmentProvider"
provider = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_PROVIDER}

provider-config {
# timeout for all facades
ask-timeout = 10s
ask-timeout = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_ASK_TIMEOUT}

cache {
# how many things to cache in total on a single cluster node
maximum-size = 20000
maximum-size = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_CACHE_MAXIMUM_SIZE}

# maximum duration of inconsistency after e.g. a policy update
expire-after-create = 2m
expire-after-create = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_CACHE_EXPIRE_AFTER_CREATE}
}
}
}

persistence-ping {
# journal tag to query to find our which connectionPersistenceActors to ping (reconnect)
journal-tag = ""
Expand Down
Loading

0 comments on commit 0d0bad5

Please sign in to comment.