Skip to content

Commit

Permalink
Make UpdatedConnectionTester a DittoExtensionPoint
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Jul 15, 2022
1 parent ae09d7a commit 780e68a
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 33 deletions.
Expand Up @@ -204,21 +204,19 @@ public final class ConnectionPersistenceActor
final Config connectivityConfigOverwrites) {

super(connectionId, new ConnectionMongoSnapshotAdapter());

this.updatedConnectionTester = UpdatedConnectionTester.getInstance(context().system());
final var actorSystem = getContext().getSystem();
cluster = Cluster.get(actorSystem);
final Config dittoExtensionConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
this.updatedConnectionTester = UpdatedConnectionTester.get(actorSystem, dittoExtensionConfig);
this.commandForwarderActor = commandForwarderActor;
propsFactory =
ClientActorPropsFactory.get(actorSystem, ScopedConfig.dittoExtension(actorSystem.settings().config()));
propsFactory = ClientActorPropsFactory.get(actorSystem, dittoExtensionConfig);
this.pubSubMediator = pubSubMediator;
this.connectivityConfigOverwrites = connectivityConfigOverwrites;
connectivityConfig = getConnectivityConfigWithOverwrites(connectivityConfigOverwrites);
commandValidator = getCommandValidator();
final ConnectionConfig connectionConfig = connectivityConfig.getConnectionConfig();
this.allClientActorsOnOneNode = allClientActorsOnOneNode.orElse(connectionConfig.areAllClientActorsOnOneNode());
final Config dittoExtensionsConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
connectionPriorityProvider = ConnectionPriorityProviderFactory.get(actorSystem, dittoExtensionsConfig)
connectionPriorityProvider = ConnectionPriorityProviderFactory.get(actorSystem, dittoExtensionConfig)
.newProvider(self(), log);
clientActorAskTimeout = connectionConfig.getClientActorAskTimeout();
final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();
Expand Down
Expand Up @@ -30,6 +30,8 @@
import org.eclipse.ditto.internal.utils.cluster.config.DefaultClusterConfig;
import org.slf4j.Logger;

import com.typesafe.config.Config;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
Expand All @@ -39,7 +41,7 @@ public final class DefaultUpdatedConnectionTester implements UpdatedConnectionTe
private static final Logger LOGGER = DittoLoggerFactory.getThreadSafeLogger(UpdatedConnectionTester.class);
private final ActorRef connectionShardRegion;

public DefaultUpdatedConnectionTester(final ActorSystem actorSystem) {
public DefaultUpdatedConnectionTester(final ActorSystem actorSystem, final Config config) {
final var clusterConfig =
DefaultClusterConfig.of(actorSystem.settings().config().getConfig("ditto.cluster"));
final ShardRegionProxyActorFactory shardRegionProxyActorFactory =
Expand Down
Expand Up @@ -12,25 +12,26 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.persistence;

import java.util.List;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.service.DittoExtensionIds;
import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.base.service.RootActorStarter;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.ModifyConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnectionResponse;
import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;

import akka.actor.AbstractExtensionId;
import com.typesafe.config.Config;

import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.actor.Extension;

/**
* Allows testing of an updated connection without the need for authentication/authorization on API level.
*/
public interface UpdatedConnectionTester extends Extension {
public interface UpdatedConnectionTester extends DittoExtensionPoint {

/**
* Tests the given connection.
Expand All @@ -40,31 +41,33 @@ public interface UpdatedConnectionTester extends Extension {
* @param dittoHeaders the ditto headers that should be used for the test connection command
* @return A completion stage resolving to true, in case the connection could be tested successfully or false if not.
*/
CompletionStage<Optional<TestConnectionResponse>> testConnection(Connection updatedConnection, DittoHeaders dittoHeaders);
CompletionStage<Optional<TestConnectionResponse>> testConnection(Connection updatedConnection,
DittoHeaders dittoHeaders);

static UpdatedConnectionTester getInstance(final ActorSystem actorSystem) {
return ExtensionId.INSTANCE.get(actorSystem);
static UpdatedConnectionTester get(final ActorSystem actorSystem, final Config config) {
checkNotNull(actorSystem, "actorSystem");
checkNotNull(config, "config");
final var extensionIdConfig = ExtensionId.computeConfig(config);
return DittoExtensionIds.get(actorSystem)
.computeIfAbsent(extensionIdConfig, ExtensionId::new)
.get(actorSystem);
}

class ExtensionId extends AbstractExtensionId<UpdatedConnectionTester> {

private static final String IMPLEMENTATION_CONFIG_KEY = "ditto.connection-update-tester";
final class ExtensionId extends DittoExtensionPoint.ExtensionId<UpdatedConnectionTester> {

private static final ExtensionId INSTANCE = new ExtensionId();
private static final String CONFIG_KEY = "updated-connection-tester";

private ExtensionId() {}
private ExtensionId(final ExtensionIdConfig<UpdatedConnectionTester> extensionIdConfig) {
super(extensionIdConfig);
}

@Override

public UpdatedConnectionTester createExtension(final ExtendedActorSystem system) {
return AkkaClassLoader.instantiate(system, UpdatedConnectionTester.class,
getImplementation(system),
List.of(ActorSystem.class),
List.of(system));
protected String getConfigKey() {
return CONFIG_KEY;
}

private String getImplementation(final ExtendedActorSystem actorSystem) {
return actorSystem.settings().config().getString(IMPLEMENTATION_CONFIG_KEY);
static ExtensionIdConfig<UpdatedConnectionTester> computeConfig(final Config config) {
return ExtensionIdConfig.of(UpdatedConnectionTester.class, config, CONFIG_KEY);
}

}
Expand Down
2 changes: 1 addition & 1 deletion connectivity/service/src/main/resources/connectivity.conf
@@ -1,6 +1,5 @@
ditto {

connection-update-tester = "org.eclipse.ditto.connectivity.service.messaging.persistence.DefaultUpdatedConnectionTester"
service-name = "connectivity"

mongodb {
Expand All @@ -9,6 +8,7 @@ ditto {
}

extensions {
updated-connection-tester = "org.eclipse.ditto.connectivity.service.messaging.persistence.DefaultUpdatedConnectionTester"
# Factory for custom connection priority provider.
connection-priority-provider-factory = "org.eclipse.ditto.connectivity.service.messaging.persistence.UsageBasedPriorityProviderFactory"
# Factory for custom client actor props.
Expand Down
Expand Up @@ -20,11 +20,13 @@
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnectionResponse;

import com.typesafe.config.Config;

import akka.actor.ActorSystem;

public final class AlwaysFailingUpdatedConnectionTester implements UpdatedConnectionTester{

private AlwaysFailingUpdatedConnectionTester(final ActorSystem actorSystem) {}
private AlwaysFailingUpdatedConnectionTester(final ActorSystem actorSystem, final Config config) {}

@Override
public CompletionStage<Optional<TestConnectionResponse>> testConnection(final Connection updatedConnection,
Expand Down
2 changes: 1 addition & 1 deletion connectivity/service/src/test/resources/test.conf
Expand Up @@ -20,7 +20,7 @@ akka.cluster.roles = ["thing-event-aware", "live-signal-aware", "acks-aware", "p

ditto {
extensions {
connection-update-tester = org.eclipse.ditto.connectivity.service.messaging.persistence.AlwaysFailingUpdatedConnectionTester
updated-connection-tester = org.eclipse.ditto.connectivity.service.messaging.persistence.AlwaysFailingUpdatedConnectionTester
connection-priority-provider-factory = org.eclipse.ditto.connectivity.service.messaging.persistence.UsageBasedPriorityProviderFactory
client-actor-props-factory = org.eclipse.ditto.connectivity.service.messaging.DefaultClientActorPropsFactory
signal-enrichment-provider {
Expand Down

0 comments on commit 780e68a

Please sign in to comment.