Skip to content

Commit

Permalink
Use SignalTransformer to append default namespace to relevant commands
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed May 30, 2022
1 parent d2cbef7 commit 8843572
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceOperationsActor;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceStreamingActorCreator;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionSupervisorActor;
import org.eclipse.ditto.edge.service.dispatching.DefaultNamespaceProvider;
import org.eclipse.ditto.edge.service.dispatching.EdgeCommandForwarderActor;
import org.eclipse.ditto.edge.service.dispatching.ShardRegions;
import org.eclipse.ditto.edge.service.dispatching.SignalTransformer;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.ClusterUtil;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
Expand Down Expand Up @@ -170,10 +170,10 @@ private ActorRef getHealthCheckingActor(final ConnectivityConfig connectivityCon
}

private ActorRef getCommandForwarder(final ClusterConfig clusterConfig, final ActorRef pubSubMediator) {
final DefaultNamespaceProvider defaultNamespaceProvider = DefaultNamespaceProvider.get(getContext().system());
final SignalTransformer signalTransformer = SignalTransformer.get(getContext().system());
return startChildActor(EdgeCommandForwarderActor.ACTOR_NAME,
EdgeCommandForwarderActor.props(pubSubMediator,
ShardRegions.of(getContext().getSystem(), clusterConfig), defaultNamespaceProvider));
ShardRegions.of(getContext().getSystem(), clusterConfig), signalTransformer));
}

private static ActorRef startConnectionShardRegion(final ActorSystem actorSystem,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.edge.service.dispatching;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.policies.enforcement.config.DefaultEntityCreationConfig;
import org.eclipse.ditto.policies.enforcement.config.EntityCreationConfig;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicy;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;

import akka.actor.ActorSystem;
import akka.japi.pf.PFBuilder;
import scala.PartialFunction;

public class DefaultNamespaceAppender implements SignalTransformer {

private final String defaultNamespace;
private final PartialFunction<Signal<?>, Signal<?>> signalTransformer;

/**
* @param actorSystem the actor system in which to load the extension.
*/
protected DefaultNamespaceAppender(final ActorSystem actorSystem) {
final EntityCreationConfig entityCreationConfig = DefaultEntityCreationConfig.of(
DefaultScopedConfig.dittoScoped(actorSystem.settings().config())
);
defaultNamespace = entityCreationConfig.getDefaultNamespace();
signalTransformer = new PFBuilder<Signal<?>, Signal<?>>()
.match(CreateThing.class, this::handleCreateThing)
.match(CreatePolicy.class, this::handleCreatePolicy)
.matchAny(signal -> signal)
.build();
}

public DefaultNamespaceAppender(final String defaultNamespace) {
this.defaultNamespace = defaultNamespace;
signalTransformer = new PFBuilder<Signal<?>, Signal<?>>()
.match(CreateThing.class, this::handleCreateThing)
.match(CreatePolicy.class, this::handleCreatePolicy)
.matchAny(signal -> signal)
.build();
}

@Override
public CompletionStage<Signal<?>> apply(final Signal<?> signal) {
return CompletableFuture.completedStage(signalTransformer.apply(signal));
}

private CreateThing handleCreateThing(final CreateThing createThing) {
final Optional<ThingId> providedThingId = createThing.getThing().getEntityId();
final ThingId namespacedThingId = providedThingId
.map(thingId -> {
if (thingId.getNamespace().isEmpty()) {
return ThingId.of(defaultNamespace, thingId.getName());
} else {
return thingId;
}
})
.orElseGet(() -> ThingId.inNamespaceWithRandomName(defaultNamespace));
final Thing thingWithNamespacedId = createThing.getThing().toBuilder().setId(namespacedThingId).build();
@Nullable final JsonObject initialPolicy = createThing.getInitialPolicy().orElse(null);
@Nullable final String policyIdOrPlaceholder = createThing.getPolicyIdOrPlaceholder().orElse(null);
final DittoHeaders dittoHeaders = createThing.getDittoHeaders();
return CreateThing.of(thingWithNamespacedId, initialPolicy, policyIdOrPlaceholder, dittoHeaders);
}

private CreatePolicy handleCreatePolicy(final CreatePolicy createPolicy) {
final Optional<PolicyId> providedPolicyId = createPolicy.getPolicy().getEntityId();
final PolicyId namespacedPolicyId = providedPolicyId
.map(policyId -> {
if (policyId.getNamespace().isEmpty()) {
return PolicyId.of(defaultNamespace, policyId.getName());
} else {
return policyId;
}
})
.orElseGet(() -> PolicyId.inNamespaceWithRandomName(defaultNamespace));
final Policy policyWithNamespacedId = createPolicy.getPolicy().toBuilder().setId(namespacedPolicyId).build();
final DittoHeaders dittoHeaders = createPolicy.getDittoHeaders();
return CreatePolicy.of(policyWithNamespacedId, dittoHeaders);
}

}

This file was deleted.

Loading

0 comments on commit 8843572

Please sign in to comment.