Skip to content

Commit

Permalink
added SudoQueryCommandResponse with is a SudoCommandResponse containi…
Browse files Browse the repository at this point in the history
…ng an entity

* adjusted some existing SudoCommandResponses to using that new interface
* made DeleteExpiredSubject an internal SudoDeleteExpiredSubject - it is not a part of the model API
* moved PolicyReferenceTag to search "api" module + add it to mapping strategies for search, making it deserializable
* fixed Acknowledgement aggregator starting to also start for "delete" and "merge" command categories containing ack requests
* simplified mapping strategies, using method reference lambda

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jun 28, 2022
1 parent 50c4e76 commit 16cfae2
Show file tree
Hide file tree
Showing 32 changed files with 151 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.WithEntity;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;

/**
* Aggregates all sudo command responses in Ditto.
*
* @param <T> the type of the implementing class.
*/
public interface SudoCommandResponse<T extends SudoCommandResponse<T>> extends CommandResponse<T>, WithEntity<T> {
public interface SudoCommandResponse<T extends SudoCommandResponse<T>> extends CommandResponse<T> {

/**
* Type qualifier of sudo command responses.
Expand All @@ -37,9 +35,6 @@ default JsonPointer getResourcePath() {
return JsonPointer.empty();
}

@Override
T setEntity(JsonValue entity);

@Override
T setDittoHeaders(DittoHeaders dittoHeaders);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.base.api.commands.sudo;

import org.eclipse.ditto.base.model.signals.commands.WithEntity;
import org.eclipse.ditto.json.JsonValue;

/**
* Aggregates all sudo "query" command responses in Ditto.
*
* @param <T> the type of the implementing class.
*/
public interface SudoQueryCommandResponse<T extends SudoQueryCommandResponse<T>> extends SudoCommandResponse<T>,
WithEntity<T> {

@Override
T setEntity(JsonValue entity);

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package org.eclipse.ditto.base.api.persistence.cleanup;

import org.eclipse.ditto.base.api.commands.sudo.SudoCommandResponse;
import org.eclipse.ditto.base.api.commands.sudo.SudoQueryCommandResponse;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
Expand All @@ -27,7 +27,7 @@
*
* @param <T> the type of the implementing class.
*/
public interface CleanupCommandResponse<T extends CleanupCommandResponse<T>> extends SudoCommandResponse<T>,
public interface CleanupCommandResponse<T extends CleanupCommandResponse<T>> extends SudoQueryCommandResponse<T>,
SignalWithEntityId<T> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,14 @@ enum Category {
ACTION;

/**
* Determines whether the passed {@code category} is CREATE or MODIFY.
* Determines whether the passed {@code category} effectively modifies the targeted entity.
*
* @param category the category to check.
* @return whether the passed {@code category} is CREATE or MODIFY
* @return whether the passed {@code category} effectively modifies the targeted entity
* @since 3.0.0
*/
public static boolean isCreateOrModify(final Category category) {
return category == CREATE || category == MODIFY;
public static boolean isEntityModifyingCommand(final Category category) {
return category == CREATE || category == MODIFY || category == MERGE || category == DELETE;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,24 @@ private static MappingStrategies getConnectivityMappingStrategies() {
.add(GlobalCommandResponseRegistry.getInstance())
.add(GlobalEventRegistry.getInstance())
.add(GlobalErrorRegistry.getInstance())
.add(Connection.class, jsonObject -> ConnectivityModelFactory.connectionFromJson(jsonObject)) // do not replace with lambda!
.add("ImmutableConnection", jsonObject -> ConnectivityModelFactory.connectionFromJson(jsonObject)) // do not replace with lambda!
.add(ResourceStatus.class, jsonObject -> ConnectivityModelFactory.resourceStatusFromJson(jsonObject)) // do not replace with lambda!
.add("ImmutableResourceStatus", jsonObject -> ConnectivityModelFactory.resourceStatusFromJson(jsonObject)) // do not replace with lambda!
.add(ConnectivityStatus.class,
jsonObject -> ConnectivityStatus.fromJson(jsonObject)) // do not replace with lambda!
.add(BaseClientState.class,
jsonObject -> BaseClientState.fromJson(jsonObject)) // do not replace with lambda!
.add(ConnectionTag.class,
jsonObject -> ConnectionTag.fromJson(jsonObject)) // do not replace with lambda!
.add(Connection.class, ConnectivityModelFactory::connectionFromJson)
.add("ImmutableConnection", ConnectivityModelFactory::connectionFromJson)
.add(ResourceStatus.class, ConnectivityModelFactory::resourceStatusFromJson)
.add("ImmutableResourceStatus", ConnectivityModelFactory::resourceStatusFromJson)
.add(ConnectivityStatus.class, ConnectivityStatus::fromJson)
.add(BaseClientState.class, BaseClientState::fromJson)
.add(ConnectionTag.class, ConnectionTag::fromJson)
.add(BatchedEntityIdWithRevisions.typeOf(ConnectionTag.class),
BatchedEntityIdWithRevisions.deserializer(jsonObject -> ConnectionTag.fromJson(jsonObject)))
BatchedEntityIdWithRevisions.deserializer(ConnectionTag::fromJson))
.build();

final MappingStrategies specialStrategies = MappingStrategiesBuilder.newInstance()
.add(OutboundSignal.class,
jsonObject -> OutboundSignalFactory.outboundSignalFromJson(jsonObject, strategies)) // do not replace with lambda!
jsonObject -> OutboundSignalFactory.outboundSignalFromJson(jsonObject, strategies))
.add(InboundSignal.class, jsonObject -> InboundSignal.fromJson(jsonObject, strategies))
.add("UnmappedOutboundSignal",
jsonObject -> OutboundSignalFactory.outboundSignalFromJson(jsonObject, strategies))
.build();// do not replace with lambda!
.build();

return MappingStrategiesBuilder.newInstance()
.putAll(strategies)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ public static boolean shouldStartForIncoming(final Signal<?> signal) {
final Collection<AcknowledgementRequest> ackRequests = signal.getDittoHeaders().getAcknowledgementRequests();
if (signal instanceof Announcement<?>) {
result = !ackRequests.isEmpty();
} else if (signal instanceof Command<?> command && Command.Category.isCreateOrModify(command.getCategory()) &&
} else if (signal instanceof Command<?> command &&
Command.Category.isEntityModifyingCommand(command.getCategory()) &&
!isLiveSignal) {
result = ackRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotLiveResponse);
} else if (Command.isMessageCommand(signal) || isLiveSignal && Command.isThingCommand(signal)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import org.eclipse.ditto.base.model.signals.JsonParsable;
import org.eclipse.ditto.base.model.signals.JsonParsableRegistry;
import org.eclipse.ditto.base.model.signals.ShardedMessageEnvelope;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.internal.utils.akka.PingCommand;
import org.eclipse.ditto.internal.utils.akka.PingCommandResponse;
import org.eclipse.ditto.internal.utils.akka.SimpleCommand;
import org.eclipse.ditto.internal.utils.akka.SimpleCommandResponse;
import org.eclipse.ditto.internal.utils.akka.streaming.StreamAck;
import org.eclipse.ditto.internal.utils.health.StatusInfo;
import org.eclipse.ditto.json.JsonObject;

/**
* A mutable builder with a fluent API for a Map containing mapping strategies. This builder mainly exists to eliminate
Expand Down Expand Up @@ -62,18 +62,12 @@ public static MappingStrategiesBuilder newInstance() {

// add the commonly known types:
builder.add(DittoHeaders.class, jsonObject -> DittoHeaders.newBuilder(jsonObject).build());
builder.add(ShardedMessageEnvelope.class,
jsonObject -> ShardedMessageEnvelope.fromJson(jsonObject)); // do not replace with lambda!
builder.add(SimpleCommand.class,
jsonObject -> SimpleCommand.fromJson(jsonObject)); // do not replace with lambda!
builder.add(SimpleCommandResponse.class,
jsonObject -> SimpleCommandResponse.fromJson(jsonObject)); // do not replace with lambda!
builder.add(PingCommand.class,
jsonObject -> PingCommand.fromJson(jsonObject)); // do not replace with lambda!
builder.add(PingCommandResponse.class,
jsonObject -> PingCommandResponse.fromJson(jsonObject)); // do not replace with lambda!
builder.add(StatusInfo.class,
jsonObject -> StatusInfo.fromJson(jsonObject)); // do not replace with lambda!
builder.add(ShardedMessageEnvelope.class, ShardedMessageEnvelope::fromJson);
builder.add(SimpleCommand.class, SimpleCommand::fromJson);
builder.add(SimpleCommandResponse.class, SimpleCommandResponse::fromJson);
builder.add(PingCommand.class, PingCommand::fromJson);
builder.add(PingCommandResponse.class, PingCommandResponse::fromJson);
builder.add(StatusInfo.class, StatusInfo::fromJson);
builder.add(StreamAck.class, StreamAck::fromJson);

return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
*/
package org.eclipse.ditto.policies.api.commands.sudo;

import org.eclipse.ditto.base.api.commands.sudo.SudoCommandResponse;
import org.eclipse.ditto.base.api.commands.sudo.SudoQueryCommandResponse;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.WithEntity;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonValue;
Expand All @@ -27,8 +26,8 @@
*
* @param <T> the type of the implementing class.
*/
public interface PolicySudoCommandResponse<T extends PolicySudoCommandResponse<T>> extends SudoCommandResponse<T>,
WithEntity<T> {
public interface PolicySudoQueryCommandResponse<T extends PolicySudoQueryCommandResponse<T>> extends
SudoQueryCommandResponse<T> {

/**
* Type Prefix of Policy sudo command responses.
Expand All @@ -47,7 +46,7 @@ default String getResourceType() {
T setDittoHeaders(DittoHeaders dittoHeaders);

/**
* An enumeration of the known {@link org.eclipse.ditto.json.JsonField}s of a {@link PolicySudoCommandResponse}.
* An enumeration of the known {@link org.eclipse.ditto.json.JsonField}s of a {@link PolicySudoQueryCommandResponse}.
*/
class JsonFields extends CommandResponse.JsonFields {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
@Immutable
@JsonParsableCommandResponse(type = SudoRetrievePolicyResponse.TYPE)
public final class SudoRetrievePolicyResponse extends AbstractCommandResponse<SudoRetrievePolicyResponse>
implements PolicySudoCommandResponse<SudoRetrievePolicyResponse>, SignalWithEntityId<SudoRetrievePolicyResponse> {
implements PolicySudoQueryCommandResponse<SudoRetrievePolicyResponse>, SignalWithEntityId<SudoRetrievePolicyResponse> {

/**
* Type of this response.
Expand All @@ -62,7 +62,7 @@ public final class SudoRetrievePolicyResponse extends AbstractCommandResponse<Su
context -> {
final var jsonObject = context.getJsonObject();
return new SudoRetrievePolicyResponse(
PolicyId.of(jsonObject.getValueOrThrow(PolicySudoCommandResponse.JsonFields.JSON_POLICY_ID)),
PolicyId.of(jsonObject.getValueOrThrow(PolicySudoQueryCommandResponse.JsonFields.JSON_POLICY_ID)),
context.getDeserializedHttpStatus(),
jsonObject.getValueOrThrow(JSON_POLICY),
context.getDittoHeaders()
Expand Down Expand Up @@ -188,7 +188,7 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder,
final Predicate<JsonField> thePredicate) {

final var predicate = schemaVersion.and(thePredicate);
jsonObjectBuilder.set(PolicySudoCommandResponse.JsonFields.JSON_POLICY_ID, String.valueOf(policyId), predicate);
jsonObjectBuilder.set(PolicySudoQueryCommandResponse.JsonFields.JSON_POLICY_ID, String.valueOf(policyId), predicate);
jsonObjectBuilder.set(JSON_POLICY, policy, predicate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
@JsonParsableCommandResponse(type = SudoRetrievePolicyRevisionResponse.TYPE)
public final class SudoRetrievePolicyRevisionResponse
extends AbstractCommandResponse<SudoRetrievePolicyRevisionResponse>
implements PolicySudoCommandResponse<SudoRetrievePolicyRevisionResponse>,
implements PolicySudoQueryCommandResponse<SudoRetrievePolicyRevisionResponse>,
SignalWithEntityId<SudoRetrievePolicyRevisionResponse> {

/**
Expand All @@ -62,7 +62,7 @@ public final class SudoRetrievePolicyRevisionResponse
context -> {
final var jsonObject = context.getJsonObject();
return new SudoRetrievePolicyRevisionResponse(
PolicyId.of(jsonObject.getValueOrThrow(PolicySudoCommandResponse.JsonFields.JSON_POLICY_ID)),
PolicyId.of(jsonObject.getValueOrThrow(PolicySudoQueryCommandResponse.JsonFields.JSON_POLICY_ID)),
jsonObject.getValueOrThrow(JSON_REVISION),
context.getDeserializedHttpStatus(),
context.getDittoHeaders()
Expand Down Expand Up @@ -170,7 +170,7 @@ protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder,
final Predicate<JsonField> thePredicate) {

final var predicate = schemaVersion.and(thePredicate);
jsonObjectBuilder.set(PolicySudoCommandResponse.JsonFields.JSON_POLICY_ID, String.valueOf(policyId), predicate);
jsonObjectBuilder.set(PolicySudoQueryCommandResponse.JsonFields.JSON_POLICY_ID, String.valueOf(policyId), predicate);
jsonObjectBuilder.set(JSON_REVISION, revision, predicate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
public final class SudoRetrievePolicyResponseTest {

private static final JsonObject KNOWN_JSON = JsonFactory.newObjectBuilder() //
.set(PolicySudoCommandResponse.JsonFields.TYPE, SudoRetrievePolicyResponse.TYPE) //
.set(PolicySudoCommandResponse.JsonFields.STATUS, HttpStatus.OK.getCode()) //
.set(PolicySudoCommandResponse.JsonFields.JSON_POLICY_ID, TestConstants.Policy.POLICY_ID.toString()) //
.set(PolicySudoQueryCommandResponse.JsonFields.TYPE, SudoRetrievePolicyResponse.TYPE) //
.set(PolicySudoQueryCommandResponse.JsonFields.STATUS, HttpStatus.OK.getCode()) //
.set(PolicySudoQueryCommandResponse.JsonFields.JSON_POLICY_ID, TestConstants.Policy.POLICY_ID.toString()) //
.set(SudoRetrievePolicyResponse.JSON_POLICY,
TestConstants.Policy.POLICY.toJson(FieldType.regularOrSpecial())) //
.build();
Expand Down
1 change: 1 addition & 0 deletions policies/model/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
<!-- Don't add excludes here before checking with the whole Ditto team -->
<!-- <exclude></exclude> -->
<exclude>org.eclipse.ditto.policies.model.signals.commands.PolicyCommandSizeValidator#DITTO_LIMITS_POLICIES_MAX_SIZE_BYTES</exclude>
<exclude>org.eclipse.ditto.policies.model.signals.commands.modify.DeleteExpiredSubject</exclude>
<exclude>org.eclipse.ditto.policies.model.enforcers.PolicyEnforcers#defaultEvaluator(org.eclipse.ditto.policies.model.Policy)</exclude>
<exclude>org.eclipse.ditto.policies.model.enforcers.PolicyEnforcers#memoryOptimizedEvaluator(org.eclipse.ditto.policies.model.Policy)</exclude>
<exclude>org.eclipse.ditto.policies.model.enforcers.PolicyEnforcers#throughputOptimizedEvaluator(org.eclipse.ditto.policies.model.Policy)</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import org.eclipse.ditto.policies.model.SubjectExpiry;
import org.eclipse.ditto.policies.model.signals.announcements.PolicyAnnouncement;
import org.eclipse.ditto.policies.model.signals.announcements.SubjectDeletionAnnouncement;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeleteExpiredSubject;
import org.eclipse.ditto.policies.service.common.config.PolicyAnnouncementConfig;
import org.eclipse.ditto.policies.service.persistence.actors.strategies.commands.SudoDeleteExpiredSubject;

import akka.NotUsed;
import akka.actor.AbstractFSM;
Expand Down Expand Up @@ -73,7 +73,7 @@ public final class SubjectExpiryActor extends AbstractFSM<SubjectExpiryState, No
private final Duration gracePeriod;
private final DistributedPub<PolicyAnnouncement<?>> policyAnnouncementPub;
private final AcknowledgementAggregatorActorStarter ackregatorStarter;
private final DeleteExpiredSubject deleteExpiredSubject;
private final SudoDeleteExpiredSubject sudoDeleteExpiredSubject;
private final Duration persistenceTimeout;
private final ActorRef commandForwarder;
private final boolean enableAnnouncementsWhenDeleted;
Expand Down Expand Up @@ -105,8 +105,8 @@ private SubjectExpiryActor(final PolicyId policyId,
AcknowledgementAggregatorActorStarter.of(getContext(), maxTimeout, HeaderTranslator.empty(),
null, List.of(), List.of());
this.commandForwarder = commandForwarder;
deleteExpiredSubject =
DeleteExpiredSubject.of(policyId, subject, DittoHeaders.newBuilder().responseRequired(false).build());
sudoDeleteExpiredSubject =
SudoDeleteExpiredSubject.of(policyId, subject, DittoHeaders.newBuilder().responseRequired(false).build());
persistenceTimeout = maxTimeout;

final var backOffConfig = config.getExponentialBackOffConfig();
Expand Down Expand Up @@ -314,7 +314,7 @@ private State<SubjectExpiryState, NotUsed> timeoutInDeleted(final StateTimeout$
return stop();
} else {
// retry deletion
commandForwarder.tell(deleteExpiredSubject, ActorRef.noSender());
commandForwarder.tell(sudoDeleteExpiredSubject, ActorRef.noSender());

return goTo(DELETED);
}
Expand Down Expand Up @@ -360,7 +360,7 @@ private State<SubjectExpiryState, NotUsed> retryAnnouncementAfterBackOff(final D
} else {
// outside of grace period; delete
l.info("Grace period past for subject <{}>. Deleting.", subject);
commandForwarder.tell(deleteExpiredSubject, ActorRef.noSender());
commandForwarder.tell(sudoDeleteExpiredSubject, ActorRef.noSender());

return goTo(DELETED);
}
Expand Down Expand Up @@ -404,7 +404,7 @@ private SubjectExpiryState scheduleDeleteExpiredSubject(final SubjectExpiry expi
}

private void doDelete() {
commandForwarder.tell(deleteExpiredSubject, ActorRef.noSender());
commandForwarder.tell(sudoDeleteExpiredSubject, ActorRef.noSender());
cancelTimer(Message.DELETE.name());
}

Expand Down
Loading

0 comments on commit 16cfae2

Please sign in to comment.