Skip to content

Commit

Permalink
Use Extension to determine default namespace for entity creation
Browse files Browse the repository at this point in the history
* Moved logic out of model. Handling of resolving the default namespace
  is only relevant in edge services and from then on the namespace counts
  as resolved

Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed May 30, 2022
1 parent 061fa0f commit 82b4fe9
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ public final class DittoSystemProperties {
*/
public static final String DITTO_LIMITS_MESSAGES_MAX_SIZE_BYTES = "ditto.limits.messages.max-size";

/**
* System property name of the property defining the default namespace to use when creating new entities.
*/
public static final String DITTO_ENTITY_CREATION_DEFAULT_NAMESPACE = "ditto.entity-creation.default-namespace";

private DittoSystemProperties() {
throw new AssertionError();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceOperationsActor;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceStreamingActorCreator;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionSupervisorActor;
import org.eclipse.ditto.edge.api.dispatching.DefaultNamespaceProvider;
import org.eclipse.ditto.edge.api.dispatching.EdgeCommandForwarderActor;
import org.eclipse.ditto.edge.api.dispatching.ShardRegions;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
Expand Down Expand Up @@ -169,10 +170,10 @@ private ActorRef getHealthCheckingActor(final ConnectivityConfig connectivityCon
}

private ActorRef getCommandForwarder(final ClusterConfig clusterConfig, final ActorRef pubSubMediator) {

final DefaultNamespaceProvider defaultNamespaceProvider = DefaultNamespaceProvider.get(getContext().system());
return startChildActor(EdgeCommandForwarderActor.ACTOR_NAME,
EdgeCommandForwarderActor.props(pubSubMediator,
ShardRegions.of(getContext().getSystem(), clusterConfig)));
ShardRegions.of(getContext().getSystem(), clusterConfig), defaultNamespaceProvider));
}

private static ActorRef startConnectionShardRegion(final ActorSystem actorSystem,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.edge.api.dispatching;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import org.eclipse.ditto.base.service.DittoExtensionPoint;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicy;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;

import akka.actor.ActorSystem;

/**
* Determines the default namespace based on the given signal.
*/
public interface DefaultNamespaceProvider extends DittoExtensionPoint {

/**
* @param createThing The command that requires a default namespace.
* @return The default namespace.
*/
String getDefaultNamespace(CreateThing createThing);

/**
* @param createPolicy The command that requires a default namespace.
* @return The default namespace.
*/
String getDefaultNamespace(CreatePolicy createPolicy);

static DefaultNamespaceProvider get(final ActorSystem actorSystem) {
checkNotNull(actorSystem, "actorSystem");
return ExtensionId.INSTANCE.get(actorSystem);
}

final class ExtensionId extends DittoExtensionPoint.ExtensionId<DefaultNamespaceProvider> {

private static final String CONFIG_PATH = "ditto.entity-creation.default-namespace-provider";
private static final ExtensionId INSTANCE = new ExtensionId(DefaultNamespaceProvider.class);

private ExtensionId(final Class<DefaultNamespaceProvider> parentClass) {
super(parentClass);
}

@Override
protected String getConfigPath() {
return CONFIG_PATH;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,30 @@
*/
package org.eclipse.ditto.edge.api.dispatching;

import java.util.Optional;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.signals.commands.PolicyCommand;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicy;
import org.eclipse.ditto.things.api.ThingsMessagingConstants;
import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThings;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThings;
import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants;
import org.eclipse.ditto.thingsearch.api.commands.sudo.ThingSearchSudoCommand;
Expand Down Expand Up @@ -52,11 +64,15 @@ public class EdgeCommandForwarderActor extends AbstractActor {
private final ActorRef pubSubMediator;
private final ShardRegions shardRegions;

private final DefaultNamespaceProvider defaultNamespaceProvider;

@SuppressWarnings("unused")
private EdgeCommandForwarderActor(final ActorRef pubSubMediator, final ShardRegions shardRegions) {
private EdgeCommandForwarderActor(final ActorRef pubSubMediator, final ShardRegions shardRegions,
final DefaultNamespaceProvider defaultNamespaceProvider) {

this.pubSubMediator = pubSubMediator;
this.shardRegions = shardRegions;
this.defaultNamespaceProvider = defaultNamespaceProvider;
}

/**
Expand All @@ -66,8 +82,9 @@ private EdgeCommandForwarderActor(final ActorRef pubSubMediator, final ShardRegi
* @param shardRegions shard regions to use in order to dispatch different entity Signals to.
* @return the Akka configuration Props object.
*/
public static Props props(final ActorRef pubSubMediator, final ShardRegions shardRegions) {
return Props.create(EdgeCommandForwarderActor.class, pubSubMediator, shardRegions);
public static Props props(final ActorRef pubSubMediator, final ShardRegions shardRegions,
final DefaultNamespaceProvider defaultNamespaceProvider) {
return Props.create(EdgeCommandForwarderActor.class, pubSubMediator, shardRegions, defaultNamespaceProvider);
}

@Override
Expand All @@ -77,9 +94,11 @@ public Receive createReceive() {

final Receive forwardingReceive = ReceiveBuilder.create()
.match(MessageCommand.class, this::forwardToThings)
.match(CreateThing.class, this::handleCreateThing)
.match(ThingCommand.class, this::forwardToThings)
.match(RetrieveThings.class, this::forwardToThingsAggregator)
.match(SudoRetrieveThings.class, this::forwardToThingsAggregator)
.match(CreatePolicy.class, this::handleCreatePolicy)
.match(PolicyCommand.class, this::forwardToPolicies)
.match(ConnectivityCommand.class, this::forwardToConnectivity)
.match(ThingSearchCommand.class, this::forwardToThingSearch)
Expand All @@ -95,6 +114,30 @@ public Receive createReceive() {
return receiveExtension.orElse(forwardingReceive);
}

private void handleCreateThing(final CreateThing createThing) {
final Optional<ThingId> providedThingId = createThing.getThing().getEntityId();
final ThingId namespacedThingId = providedThingId
.map(thingId -> {
if (thingId.getNamespace().isEmpty()) {
final String defaultNamespace = defaultNamespaceProvider.getDefaultNamespace(createThing);
return ThingId.of(defaultNamespace, thingId.getName());
} else {
return thingId;
}
})
.orElseGet(() -> {
final String defaultNamespace = defaultNamespaceProvider.getDefaultNamespace(createThing);
return ThingId.inNamespaceWithRandomName(defaultNamespace);
});
final Thing thingWithNamespacedId = createThing.getThing().toBuilder().setId(namespacedThingId).build();
@Nullable final JsonObject initialPolicy = createThing.getInitialPolicy().orElse(null);
@Nullable final String policyIdOrPlaceholder = createThing.getPolicyIdOrPlaceholder().orElse(null);
final CreateThing createThingWithNamespace =
CreateThing.of(thingWithNamespacedId, initialPolicy, policyIdOrPlaceholder,
createThing.getDittoHeaders());
forwardToThings(createThingWithNamespace);
}

private void forwardToThings(final MessageCommand<?, ?> messageCommand) {
log.withCorrelationId(messageCommand)
.info("Forwarding message command with ID <{}> and type <{}> to 'things' shard region",
Expand All @@ -117,6 +160,27 @@ private void forwardToThingsAggregator(final Command<?> command) {
pubSubMediator.forward(pubSubMsg, getContext());
}

private void handleCreatePolicy(final CreatePolicy createPolicy) {
final Optional<PolicyId> providedPolicyId = createPolicy.getPolicy().getEntityId();
final PolicyId namespacedPolicyId = providedPolicyId
.map(policyId -> {
if (policyId.getNamespace().isEmpty()) {
final String defaultNamespace = defaultNamespaceProvider.getDefaultNamespace(createPolicy);
return PolicyId.of(defaultNamespace, policyId.getName());
} else {
return policyId;
}
})
.orElseGet(() -> {
final String defaultNamespace = defaultNamespaceProvider.getDefaultNamespace(createPolicy);
return PolicyId.inNamespaceWithRandomName(defaultNamespace);
});
final Policy policyWithNamespacedId = createPolicy.getPolicy().toBuilder().setId(namespacedPolicyId).build();
final DittoHeaders dittoHeaders = createPolicy.getDittoHeaders();
final CreatePolicy createPolicyWithNamespace = CreatePolicy.of(policyWithNamespacedId, dittoHeaders);
forwardToPolicies(createPolicyWithNamespace);
}

private void forwardToPolicies(final PolicyCommand<?> policyCommand) {
log.withCorrelationId(policyCommand)
.info("Forwarding policy command with ID <{}> and type <{}> to 'policies' shard region",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.edge.api.dispatching;

import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.policies.enforcement.config.DefaultEntityCreationConfig;
import org.eclipse.ditto.policies.enforcement.config.EntityCreationConfig;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicy;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;

import akka.actor.ActorSystem;

public final class StaticDefaultNamespaceProvider implements DefaultNamespaceProvider {

private final String defaultNamespace;

public StaticDefaultNamespaceProvider(final ActorSystem actorSystem) {
final EntityCreationConfig entityCreationConfig = DefaultEntityCreationConfig.of(
DefaultScopedConfig.dittoScoped(actorSystem.settings().config())
);
defaultNamespace = entityCreationConfig.getDefaultNamespace();
}

@Override
public String getDefaultNamespace(final CreateThing createThing) {
return defaultNamespace;
}

@Override
public String getDefaultNamespace(final CreatePolicy createPolicy) {
return defaultNamespace;
}

}
2 changes: 2 additions & 0 deletions edge/api/src/main/resources/ditto-edge-api.conf
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
ditto.entity-creation.default-namespace-provider = org.eclipse.ditto.edge.api.dispatching.StaticDefaultNamespaceProvider
ditto.entity-creation.default-namespace-provider = ${?DITTO_DEFAULT_NAMESPACE_PROVIDER}
ditto.edge-command-forwarder-extension = org.eclipse.ditto.edge.api.dispatching.NoOpEdgeCommandForwarderExtension
ditto.edge-command-forwarder-extension = ${?DITTO_EDGE_COMMAND_FORWARDER_EXTENSION}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.ditto.base.service.actors.DittoRootActor;
import org.eclipse.ditto.base.service.config.limits.LimitsConfig;
import org.eclipse.ditto.connectivity.api.ConnectivityMessagingConstants;
import org.eclipse.ditto.edge.api.dispatching.DefaultNamespaceProvider;
import org.eclipse.ditto.edge.api.dispatching.EdgeCommandForwarderActor;
import org.eclipse.ditto.edge.api.dispatching.ShardRegions;
import org.eclipse.ditto.gateway.service.endpoints.directives.auth.DevopsAuthenticationDirective;
Expand Down Expand Up @@ -113,8 +114,9 @@ private GatewayRootActor(final GatewayConfig gatewayConfig, final ActorRef pubSu
final HttpConfig httpConfig = gatewayConfig.getHttpConfig();

final ShardRegions shardRegions = ShardRegions.of(actorSystem, clusterConfig);
final DefaultNamespaceProvider defaultNamespaceProvider = DefaultNamespaceProvider.get(getContext().system());
final var edgeCommandForwarder = startChildActor(EdgeCommandForwarderActor.ACTOR_NAME,
EdgeCommandForwarderActor.props(pubSubMediator, shardRegions));
EdgeCommandForwarderActor.props(pubSubMediator, shardRegions, defaultNamespaceProvider));
final var proxyActor = startProxyActor(actorSystem, pubSubMediator, edgeCommandForwarder);

pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.common.DittoSystemProperties;
import org.eclipse.ditto.base.model.entity.id.AbstractNamespacedEntityId;
import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId;
import org.eclipse.ditto.base.model.entity.id.NamespacedEntityIdInvalidException;
Expand All @@ -30,11 +29,10 @@
@TypedEntityId(type = "policy")
public final class PolicyId extends AbstractNamespacedEntityId {

private static final String DEFAULT_NAMESPACE;

static {
DEFAULT_NAMESPACE = System.getProperty(DittoSystemProperties.DITTO_ENTITY_CREATION_DEFAULT_NAMESPACE, "");
}
/**
* Will be resolved to the actual default namespace inside ditto.
*/
private static final String DEFAULT_NAMESPACE = "";

private PolicyId(final String namespace, final String policyName, final boolean shouldValidate) {
super(PolicyConstants.ENTITY_TYPE, namespace, policyName, shouldValidate);
Expand Down Expand Up @@ -106,7 +104,8 @@ public static PolicyId inDefaultNamespace(final String name) {
* @since 3.0.0
*/
public static PolicyId generateRandom() {
return wrapInPolicyIdInvalidException(() -> new PolicyId(DEFAULT_NAMESPACE, UUID.randomUUID().toString(), true));
return wrapInPolicyIdInvalidException(
() -> new PolicyId(DEFAULT_NAMESPACE, UUID.randomUUID().toString(), true));
}

private static <T> T wrapInPolicyIdInvalidException(final Supplier<T> supplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.common.DittoSystemProperties;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonParsableCommand;
Expand Down Expand Up @@ -64,12 +63,6 @@ public final class CreatePolicy extends AbstractCommand<CreatePolicy> implements
static final JsonFieldDefinition<JsonObject> JSON_POLICY =
JsonFactory.newJsonObjectFieldDefinition("policy", FieldType.REGULAR, JsonSchemaVersion.V_2);

private static final String DEFAULT_NAMESPACE;

static {
DEFAULT_NAMESPACE = System.getProperty(DittoSystemProperties.DITTO_ENTITY_CREATION_DEFAULT_NAMESPACE, "");
}

private final Policy policy;

private CreatePolicy(final Policy policy, final DittoHeaders dittoHeaders) {
Expand Down Expand Up @@ -101,7 +94,7 @@ private CreatePolicy(final Policy policy, final DittoHeaders dittoHeaders) {
* @throws org.eclipse.ditto.policies.model.PolicyIdInvalidException if the {@link org.eclipse.ditto.policies.model.Policy}'s ID is not valid.
*/
public static CreatePolicy of(final Policy policy, final DittoHeaders dittoHeaders) {
return prependDefaultNamespaceToCreatePolicy(new CreatePolicy(policy, dittoHeaders));
return new CreatePolicy(policy, dittoHeaders);
}

/**
Expand Down Expand Up @@ -137,32 +130,6 @@ public static CreatePolicy fromJson(final JsonObject jsonObject, final DittoHead
});
}

private static CreatePolicy prependDefaultNamespaceToCreatePolicy(final CreatePolicy createPolicy) {
final Policy policy = createPolicy.getPolicy();
final Optional<String> namespace = policy.getNamespace();
if (!namespace.isPresent() || namespace.get().equals("")) {
final Policy policyInDefaultNamespace = policy.toBuilder()
.setId(calculatePolicyId(createPolicy))
.build();
return new CreatePolicy(policyInDefaultNamespace, createPolicy.getDittoHeaders());
} else {
return createPolicy;
}
}

private static PolicyId calculatePolicyId(final CreatePolicy createPolicy) {
final Optional<PolicyId> providedPolicyId = createPolicy.getPolicy().getEntityId();
return providedPolicyId
.map(policyId -> {
if (policyId.getNamespace().isEmpty()) {
return PolicyId.of(DEFAULT_NAMESPACE, policyId.toString().substring(1));
} else {
return policyId;
}
})
.orElseGet(() -> PolicyId.inNamespaceWithRandomName(DEFAULT_NAMESPACE));
}

/**
* Returns the {@code Policy} to create.
*
Expand Down
Loading

0 comments on commit 82b4fe9

Please sign in to comment.