Skip to content

Commit

Permalink
Add extension to allow adding custom command forwarding
Browse files Browse the repository at this point in the history
e.g. for commands issued by a route which is provided by a CustomApiRoutesProvider

Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed May 25, 2022
1 parent ab05043 commit 9399a32
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 2 deletions.
1 change: 1 addition & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,7 @@ akka {
}

include "ditto-protocol-subscriber.conf"
include "ditto-edge-api.conf"

akka-contrib-mongodb-persistence-connection-journal {
class = "akka.contrib.persistence.mongodb.MongoJournal"
Expand Down
1 change: 0 additions & 1 deletion edge/api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-thingsearch-model</artifactId>
</dependency>

<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-thingsearch-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ public static Props props(final ActorRef pubSubMediator, final ShardRegions shar

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
final Receive receiveExtension = EdgeCommandForwarderExtension.get(context().system())
.getReceiveExtension(getContext());

final Receive forwardingReceive = ReceiveBuilder.create()
.match(MessageCommand.class, this::forwardToThings)
.match(ThingCommand.class, this::forwardToThings)
.match(RetrieveThings.class, this::forwardToThingsAggregator)
Expand All @@ -92,6 +95,8 @@ public Receive createReceive() {
)
.matchAny(m -> log.warning("Got unknown message: {}", m))
.build();

return receiveExtension.orElse(forwardingReceive);
}

private void forwardToThings(final MessageCommand<?, ?> messageCommand) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.edge.api.dispatching;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.service.DittoExtensionPoint;

import akka.actor.AbstractActor;
import akka.actor.ActorSystem;

/**
* This extension allows to extend commands that are forwarded from the edges of ditto to other microservices by
* handling them in the {@link AbstractActor.Receive} provided by {@link #getReceiveExtension(akka.actor.AbstractActor.ActorContext)}.
*/
public interface EdgeCommandForwarderExtension extends DittoExtensionPoint {

static EdgeCommandForwarderExtension get(final ActorSystem actorSystem) {
checkNotNull(actorSystem, "actorSystem");
return ExtensionId.INSTANCE.get(actorSystem);
}

/**
* Builds the receive extension to allow custom handling of messages.
* This receive will be applied BEFORE the default receives of {@link EdgeCommandForwarderActor}, so it's possible
* to overwrite the default handling.
*
* @param actorContext can be used for example to determine the original sender of a message.
* @return The desired receive extension.
*/
AbstractActor.Receive getReceiveExtension(AbstractActor.ActorContext actorContext);

final class ExtensionId extends DittoExtensionPoint.ExtensionId<EdgeCommandForwarderExtension> {

private static final String CONFIG_PATH = "ditto.edge-command-forwarder-extension";
private static final ExtensionId INSTANCE = new ExtensionId(EdgeCommandForwarderExtension.class);

private ExtensionId(final Class<EdgeCommandForwarderExtension> parentClass) {
super(parentClass);
}

@Override
protected String getConfigPath() {
return CONFIG_PATH;
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.edge.api.dispatching;

import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;

final class NoOpEdgeCommandForwarderExtension implements EdgeCommandForwarderExtension{

@Override
public AbstractActor.Receive getReceiveExtension(final AbstractActor.ActorContext actorContext) {
return ReceiveBuilder.create().build();
}

}
2 changes: 2 additions & 0 deletions edge/api/src/main/resources/ditto-edge-api.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ditto.edge-command-forwarder-extension = org.eclipse.ditto.edge.api.dispatching.NoOpEdgeCommandForwarderExtension
ditto.edge-command-forwarder-extension = ${?DITTO_EDGE_COMMAND_FORWARDER_EXTENSION}
1 change: 1 addition & 0 deletions gateway/service/src/main/resources/gateway.conf
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ akka {
}

include "ditto-protocol-subscriber.conf"
include "ditto-edge-api.conf"

authentication-dispatcher {
type = Dispatcher
Expand Down

0 comments on commit 9399a32

Please sign in to comment.