Skip to content

Commit

Permalink
Revert "Remove unnecessary SignalTransformer extension"
Browse files Browse the repository at this point in the history
This reverts commit 061fa0f.

Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed May 30, 2022
1 parent b302ade commit d2cbef7
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 17 deletions.
2 changes: 1 addition & 1 deletion connectivity/service/src/main/resources/connectivity.conf
Expand Up @@ -962,7 +962,7 @@ akka {
}

include "ditto-protocol-subscriber.conf"
include "ditto-edge-api.conf"
include "ditto-edge-service.conf"

akka-contrib-mongodb-persistence-connection-journal {
class = "akka.contrib.persistence.mongodb.MongoJournal"
Expand Down
Expand Up @@ -16,6 +16,8 @@

import javax.annotation.Nullable;

import java.util.function.Function;

import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
Expand Down Expand Up @@ -63,6 +65,7 @@ public class EdgeCommandForwarderActor extends AbstractActor {

private final ActorRef pubSubMediator;
private final ShardRegions shardRegions;
private final Function<Signal<?>, Signal<?>> signalTransformer;

private final DefaultNamespaceProvider defaultNamespaceProvider;

Expand All @@ -73,6 +76,7 @@ private EdgeCommandForwarderActor(final ActorRef pubSubMediator, final ShardRegi
this.pubSubMediator = pubSubMediator;
this.shardRegions = shardRegions;
this.defaultNamespaceProvider = defaultNamespaceProvider;
signalTransformer = SignalTransformer.get(getContext().getSystem());
}

/**
Expand Down Expand Up @@ -139,19 +143,22 @@ private void handleCreateThing(final CreateThing createThing) {
}

private void forwardToThings(final MessageCommand<?, ?> messageCommand) {
log.withCorrelationId(messageCommand)
final MessageCommand<?, ?> transformedMessageCommand =
(MessageCommand<?, ?>) signalTransformer.apply(messageCommand);
log.withCorrelationId(transformedMessageCommand)
.info("Forwarding message command with ID <{}> and type <{}> to 'things' shard region",
messageCommand.getEntityId(), messageCommand.getType());
transformedMessageCommand.getEntityId(), transformedMessageCommand.getType());
shardRegions.things()
.forward(messageCommand, getContext());
.forward(transformedMessageCommand, getContext());
}

private void forwardToThings(final ThingCommand<?> thingCommand) {
log.withCorrelationId(thingCommand)
final ThingCommand<?> transformedThingCommand = (ThingCommand<?>) signalTransformer.apply(thingCommand);
log.withCorrelationId(transformedThingCommand)
.info("Forwarding thing command with ID <{}> and type <{}> to 'things' shard region",
thingCommand.getEntityId(), thingCommand.getType());
transformedThingCommand.getEntityId(), transformedThingCommand.getType());
shardRegions.things()
.forward(thingCommand, getContext());
.forward(transformedThingCommand, getContext());
}

private void forwardToThingsAggregator(final Command<?> command) {
Expand Down Expand Up @@ -182,20 +189,23 @@ private void handleCreatePolicy(final CreatePolicy createPolicy) {
}

private void forwardToPolicies(final PolicyCommand<?> policyCommand) {
log.withCorrelationId(policyCommand)
final PolicyCommand<?> transformedPolicyCommand = (PolicyCommand<?>) signalTransformer.apply(policyCommand);
log.withCorrelationId(transformedPolicyCommand)
.info("Forwarding policy command with ID <{}> and type <{}> to 'policies' shard region",
policyCommand.getEntityId(), policyCommand.getType());
transformedPolicyCommand.getEntityId(), transformedPolicyCommand.getType());
shardRegions.policies()
.forward(policyCommand, getContext());
.forward(transformedPolicyCommand, getContext());
}

private void forwardToConnectivity(final ConnectivityCommand<?> connectivityCommand) {
if (connectivityCommand instanceof WithEntityId withEntityId) {
log.withCorrelationId(connectivityCommand)
final ConnectivityCommand<?> transformedConnectivityCommand =
(ConnectivityCommand<?>) signalTransformer.apply(connectivityCommand);
log.withCorrelationId(transformedConnectivityCommand)
.info("Forwarding connectivity command with ID <{}> and type <{}> to 'connections' " +
"shard region", withEntityId.getEntityId(), connectivityCommand.getType());
"shard region", withEntityId.getEntityId(), transformedConnectivityCommand.getType());
shardRegions.connections()
.forward(connectivityCommand, getContext());
.forward(transformedConnectivityCommand, getContext());
} else {
log.withCorrelationId(connectivityCommand)
.error("Could not forward ConnectivityCommand not implementing WithEntityId to 'connections' " +
Expand All @@ -211,10 +221,11 @@ private void forwardToThingSearch(final Command<?> command) {

private void handleUnknownSignal(final Signal<?> signal) {

final String signalType = signal.getType();
final DittoDiagnosticLoggingAdapter l = log.withCorrelationId(signal);
final Signal<?> transformedSignal = signalTransformer.apply(signal);
final String signalType = transformedSignal.getType();
final DittoDiagnosticLoggingAdapter l = log.withCorrelationId(transformedSignal);
l.error("Received signal <{}> which is not known how to be handled: {}",
signalType, signal);
signalType, transformedSignal);
}

}
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.edge.service.dispatching;

import java.util.function.UnaryOperator;

import org.eclipse.ditto.base.model.signals.Signal;

import akka.actor.ActorSystem;

public class NoOpSignalTransformer implements SignalTransformer {

/**
* @param actorSystem the actor system in which to load the extension.
*/
protected NoOpSignalTransformer(final ActorSystem actorSystem) {
}

@Override
public Signal<?> apply(final Signal<?> signal) {
return UnaryOperator.<Signal<?>>identity().apply(signal);
}

}
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.edge.service.dispatching;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.function.UnaryOperator;

import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.service.DittoExtensionPoint;

import akka.actor.ActorSystem;

public interface SignalTransformer extends DittoExtensionPoint, UnaryOperator<Signal<?>> {

/**
* Loads the implementation of {@code SignalTransformer} which is configured for the
* {@code ActorSystem}.
*
* @param actorSystem the actorSystem in which the {@code SignalTransformer} should be loaded.
* @return the {@code SignalTransformer} implementation.
* @throws NullPointerException if {@code actorSystem} is {@code null}.
*/
static SignalTransformer get(final ActorSystem actorSystem) {
checkNotNull(actorSystem, "actorSystem");
return ExtensionId.INSTANCE.get(actorSystem);
}

final class ExtensionId extends DittoExtensionPoint.ExtensionId<SignalTransformer> {

private static final String CONFIG_PATH = "ditto.signal-transformer";
private static final ExtensionId INSTANCE = new ExtensionId(SignalTransformer.class);

private ExtensionId(final Class<SignalTransformer> parentClass) {
super(parentClass);
}

@Override
protected String getConfigPath() {
return CONFIG_PATH;
}

}
}
Expand Up @@ -2,3 +2,5 @@ ditto.entity-creation.default-namespace-provider = org.eclipse.ditto.edge.servic
ditto.entity-creation.default-namespace-provider = ${?DITTO_DEFAULT_NAMESPACE_PROVIDER}
ditto.edge-command-forwarder-extension = org.eclipse.ditto.edge.service.dispatching.NoOpEdgeCommandForwarderExtension
ditto.edge-command-forwarder-extension = ${?DITTO_EDGE_COMMAND_FORWARDER_EXTENSION}
ditto.signal-transformer = "org.eclipse.ditto.edge.service.dispatching.NoOpSignalTransformer"
ditto.signal-transformer = ${?DITTO_SIGNAL_TRANSFORMER}
2 changes: 1 addition & 1 deletion gateway/service/src/main/resources/gateway.conf
Expand Up @@ -538,7 +538,7 @@ akka {
}

include "ditto-protocol-subscriber.conf"
include "ditto-edge-api.conf"
include "ditto-edge-service.conf"

authentication-dispatcher {
type = Dispatcher
Expand Down

0 comments on commit d2cbef7

Please sign in to comment.