Skip to content

Commit

Permalink
[#1107] add requested-acks to subject announcements.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jul 6, 2021
1 parent a541f2f commit 4f77e5b
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,25 @@
package org.eclipse.ditto.policies.model;

import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.common.DittoDuration;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.base.model.common.DittoDuration;
import org.eclipse.ditto.json.JsonValue;

/**
* Immutable implementation of {@link SubjectAnnouncement}.
Expand All @@ -39,27 +47,41 @@ final class ImmutableSubjectAnnouncement implements SubjectAnnouncement {
@Nullable
private final DittoDuration beforeExpiry;
private final boolean whenDeleted;
private final List<AcknowledgementRequest> requestedAcksBeforeExpiry;
@Nullable private final DittoDuration requestedAcksTimeout;

ImmutableSubjectAnnouncement(@Nullable final DittoDuration beforeExpiry, final boolean whenDeleted) {
ImmutableSubjectAnnouncement(@Nullable final DittoDuration beforeExpiry, final boolean whenDeleted,
final List<AcknowledgementRequest> requestedAcksBeforeExpiry,
@Nullable final DittoDuration requestedAcksTimeout) {
this.beforeExpiry = beforeExpiry;
this.whenDeleted = whenDeleted;
this.requestedAcksBeforeExpiry = Collections.unmodifiableList(new ArrayList<>(requestedAcksBeforeExpiry));
this.requestedAcksTimeout = requestedAcksTimeout;
}

static ImmutableSubjectAnnouncement fromJson(final JsonObject jsonObject) {
final Optional<String> beforeExpiryString = jsonObject.getValue(JsonFields.BEFORE_EXPIRY);
final boolean whenDeleted = jsonObject.getValue(JsonFields.WHEN_DELETED).orElse(false);
final List<AcknowledgementRequest> requestedAcksBeforeExpiry =
jsonObject.getValue(JsonFields.REQUESTED_ACKS_BEFORE_EXPIRY)
.map(ImmutableSubjectAnnouncement::deserializeRequestedAcks)
.orElse(Collections.emptyList());
final DittoDuration requestedAcksTimeout = jsonObject.getValue(JsonFields.REQUESTED_ACKS_TIMEOUT)
.map(DittoDuration::parseDuration)
.orElse(null);
if (beforeExpiryString.isPresent()) {
try {
final DittoDuration beforeExpiry = DittoDuration.parseDuration(beforeExpiryString.get());
if (!BEFORE_EXPIRY_DURATION_UNITS.contains(beforeExpiry.getChronoUnit())) {
throw SubjectAnnouncementInvalidException.newBuilder(beforeExpiryString.get()).build();
}
return new ImmutableSubjectAnnouncement(beforeExpiry, whenDeleted);
return new ImmutableSubjectAnnouncement(beforeExpiry, whenDeleted, requestedAcksBeforeExpiry,
requestedAcksTimeout);
} catch (final IllegalArgumentException e) {
throw SubjectAnnouncementInvalidException.newBuilder(beforeExpiryString.get()).build();
}
} else {
return new ImmutableSubjectAnnouncement(null, whenDeleted);
return new ImmutableSubjectAnnouncement(null, whenDeleted, requestedAcksBeforeExpiry, requestedAcksTimeout);
}
}

Expand All @@ -73,36 +95,75 @@ public boolean isWhenDeleted() {
return whenDeleted;
}

@Override
public List<AcknowledgementRequest> getRequestedAcksBeforeExpiry() {
return requestedAcksBeforeExpiry;
}

@Override
public Optional<DittoDuration> getRequestedAcksTimeout() {
return Optional.ofNullable(requestedAcksTimeout);
}

@Override
public SubjectAnnouncement setBeforeExpiry(@Nullable final DittoDuration beforeExpiry) {
return new ImmutableSubjectAnnouncement(beforeExpiry, whenDeleted, requestedAcksBeforeExpiry,
requestedAcksTimeout);
}

@Override
public JsonObject toJson() {
final JsonObjectBuilder builder = JsonObject.newBuilder();
if (beforeExpiry != null) {
builder.set(JsonFields.BEFORE_EXPIRY, beforeExpiry.toString());
}
builder.set(JsonFields.WHEN_DELETED, whenDeleted);
if (!requestedAcksBeforeExpiry.isEmpty()) {
builder.set(JsonFields.REQUESTED_ACKS_BEFORE_EXPIRY, serializeRequestedAcks(requestedAcksBeforeExpiry));
}
if (requestedAcksTimeout != null) {
builder.set(JsonFields.REQUESTED_ACKS_TIMEOUT, requestedAcksTimeout.toString());
}
return builder.build();
}

@Override
public boolean equals(final Object other) {
if (other instanceof ImmutableSubjectAnnouncement) {
final ImmutableSubjectAnnouncement that = (ImmutableSubjectAnnouncement) other;
return Objects.equals(beforeExpiry, that.beforeExpiry) && whenDeleted == that.whenDeleted;
return Objects.equals(beforeExpiry, that.beforeExpiry) && whenDeleted == that.whenDeleted &&
Objects.equals(requestedAcksBeforeExpiry, that.requestedAcksBeforeExpiry) &&
Objects.equals(requestedAcksTimeout, that.requestedAcksTimeout);
} else {
return false;
}
}

@Override
public int hashCode() {
return Objects.hash(beforeExpiry, whenDeleted);
return Objects.hash(beforeExpiry, whenDeleted, requestedAcksBeforeExpiry, requestedAcksTimeout);
}

@Override
public String toString() {
return getClass().getSimpleName() +
"[beforeExpiry=" + beforeExpiry +
", whenDeleted=" + whenDeleted +
", requestedAcksBeforeExpiry=" + requestedAcksBeforeExpiry +
", requestedAcksTimeout" + requestedAcksTimeout +
"]";
}

private static List<AcknowledgementRequest> deserializeRequestedAcks(final JsonArray jsonArray) {
return jsonArray.stream()
.map(value -> AcknowledgementRequest.parseAcknowledgementRequest(value.asString()))
.collect(Collectors.toList());
}

private static JsonArray serializeRequestedAcks(final List<AcknowledgementRequest> requestedAcks) {
return requestedAcks.stream()
.map(AcknowledgementRequest::getLabel)
.map(JsonValue::of)
.collect(JsonCollectors.valuesToArray());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@
*/
package org.eclipse.ditto.policies.model;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.common.DittoDuration;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.common.DittoDuration;
import org.eclipse.ditto.base.model.json.Jsonifiable;

/**
* Represents announcement settings of a {@link Subject}.
Expand All @@ -40,7 +44,25 @@ public interface SubjectAnnouncement extends Jsonifiable<JsonObject> {
* @return the new {@link SubjectAnnouncement}.
*/
static SubjectAnnouncement of(@Nullable final DittoDuration beforeExpiry, final boolean whenDeleted) {
return new ImmutableSubjectAnnouncement(beforeExpiry, whenDeleted);
return new ImmutableSubjectAnnouncement(beforeExpiry, whenDeleted, Collections.emptyList(),
null);
}

/**
* Returns a new {@link SubjectAnnouncement} with the given configuration.
*
* @param beforeExpiry duration before expiry when an announcement should be sent, or null if no announcement should
* be sent.
* @param whenDeleted whether an announcement should be sent when the subject is deleted.
* @param requestedAcksBeforeExpiry acknowledgement requests for subject deletion announcements before expiry.
* @param requestedAcksTimeout timeout of acknowledgement requests.
* @return the new {@link SubjectAnnouncement}.
*/
static SubjectAnnouncement of(@Nullable final DittoDuration beforeExpiry, final boolean whenDeleted,
final List<AcknowledgementRequest> requestedAcksBeforeExpiry,
@Nullable DittoDuration requestedAcksTimeout) {
return new ImmutableSubjectAnnouncement(beforeExpiry, whenDeleted, requestedAcksBeforeExpiry,
requestedAcksTimeout);
}

/**
Expand Down Expand Up @@ -69,6 +91,28 @@ static SubjectAnnouncement fromJson(final JsonObject jsonObject) {
*/
boolean isWhenDeleted();

/**
* Returns acknowledgement requests to fulfill for subject deletion announcements before expiry.
*
* @return the acknowledgement requests.
*/
List<AcknowledgementRequest> getRequestedAcksBeforeExpiry();

/**
* Returns timeout of acknowledgement requests.
*
* @return the timeout.
*/
Optional<DittoDuration> getRequestedAcksTimeout();

/**
* Returns a copy of this object with the field {@code beforeExpiry} replaced.
*
* @param beforeExpiry the new value.
* @return the copy.
*/
SubjectAnnouncement setBeforeExpiry(@Nullable DittoDuration beforeExpiry);

/**
* Fields of the JSON representation of a {@code SubjectAnnouncement} object.
*/
Expand All @@ -86,6 +130,18 @@ final class JsonFields {
public static final JsonFieldDefinition<Boolean> WHEN_DELETED =
JsonFactory.newBooleanFieldDefinition("whenDeleted");

/**
* Field to store requested acknowledgements for announcements before expiry.
*/
public static final JsonFieldDefinition<JsonArray> REQUESTED_ACKS_BEFORE_EXPIRY =
JsonFactory.newJsonArrayFieldDefinition("requestedAcks/beforeExpiry");

/**
* Field to store timeout waiting for requested acknowledgements.
*/
public static final JsonFieldDefinition<String> REQUESTED_ACKS_TIMEOUT =
JsonFactory.newStringFieldDefinition("requestedAcks/timeout");

private JsonFields() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf;
import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable;

import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;

import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
Expand All @@ -33,16 +39,20 @@ public final class ImmutableSubjectAnnouncementTest {

private static final DittoDuration BEFORE_EXPIRY = DittoDuration.parseDuration("5m");
private static final String BEFORE_EXPIRY_STRING = BEFORE_EXPIRY.toString();
private static final JsonArray REQUESTED_ACKS = JsonArray.of("[\"integration:connection\"]");
private static final DittoDuration ACKS_TIMEOUT = DittoDuration.parseDuration("10s");

static final JsonObject KNOWN_JSON = JsonObject.newBuilder()
.set(SubjectAnnouncement.JsonFields.BEFORE_EXPIRY, BEFORE_EXPIRY_STRING)
.set(SubjectAnnouncement.JsonFields.WHEN_DELETED, true)
.set(SubjectAnnouncement.JsonFields.REQUESTED_ACKS_BEFORE_EXPIRY, REQUESTED_ACKS)
.set(SubjectAnnouncement.JsonFields.REQUESTED_ACKS_TIMEOUT, ACKS_TIMEOUT.toString())
.build();

@Test
public void assertImmutability() {
assertInstancesOf(ImmutableSubjectAnnouncement.class, areImmutable(),
provided(DittoDuration.class).isAlsoImmutable());
provided(DittoDuration.class, AcknowledgementRequest.class).isAlsoImmutable());
}

@Test
Expand All @@ -52,7 +62,10 @@ public void testHashCodeAndEquals() {

@Test
public void testToAndFromJson() {
final SubjectAnnouncement underTest = SubjectAnnouncement.of(BEFORE_EXPIRY, true);
final List<AcknowledgementRequest> requestedAcks =
Collections.singletonList(AcknowledgementRequest.parseAcknowledgementRequest("integration:connection"));
final SubjectAnnouncement underTest =
SubjectAnnouncement.of(BEFORE_EXPIRY, true, requestedAcks, ACKS_TIMEOUT);

final JsonObject subjectAnnouncementJson = underTest.toJson();
final SubjectAnnouncement deserialized = SubjectAnnouncement.fromJson(subjectAnnouncementJson);
Expand All @@ -62,12 +75,13 @@ public void testToAndFromJson() {
}

@Test
public void testToAndFromSubjectAnnoucementWithoutExpiryAndNotWhenDeleted() {
public void testToAndFromSubjectAnnouncementWithoutExpiryAndNotWhenDeleted() {
final SubjectAnnouncement underTest = SubjectAnnouncement.of(null, false);
final JsonObject emptyJson = underTest.toJson();
final SubjectAnnouncement emptyAnnouncement = SubjectAnnouncement.fromJson(emptyJson);
assertThat(emptyJson).containsExactly(
JsonField.newInstance(SubjectAnnouncement.JsonFields.WHEN_DELETED.getPointer().getRoot().get(),
JsonField.newInstance(SubjectAnnouncement.JsonFields.WHEN_DELETED.getPointer().getRoot()
.orElseThrow(NoSuchElementException::new),
JsonValue.of(false)));
assertThat(emptyAnnouncement).isEqualTo(underTest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable;

import java.time.Instant;
import java.util.Collections;

import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.common.DittoDuration;
import org.eclipse.ditto.json.JsonObject;
import org.junit.Test;

import nl.jqno.equalsverifier.EqualsVerifier;
Expand Down Expand Up @@ -71,7 +73,11 @@ public void testToAndFromJsonWithAllFields() {
final Subject subject = ImmutableSubject.of(SubjectId.newInstance(SubjectIssuer.GOOGLE, "myself"),
SubjectType.newInstance(KNOWN_SUBJECT_TYPE),
SubjectExpiry.newInstance(KNOWN_SUBJECT_EXPIRY_STR),
SubjectAnnouncement.of(DittoDuration.parseDuration("5m"), true));
SubjectAnnouncement.of(DittoDuration.parseDuration("5m"), true,
Collections.singletonList(
AcknowledgementRequest.parseAcknowledgementRequest("integration:connection")),
DittoDuration.parseDuration("10s")
));

final Subject subject1 = ImmutableSubject.fromJson(SubjectIssuer.GOOGLE + ":myself",
KNOWN_SUBJECT_JSON);
Expand Down Expand Up @@ -122,7 +128,8 @@ public void createSubjectWithUnknownTypeSuccess() {
@Test
public void createSubjectWithIssuerSuccess() {
final Subject underTest =
Subject.newInstance(TestConstants.Policy.SUBJECT_ISSUER, TestConstants.Policy.SUBJECT_ID_SUBJECT, TestConstants.Policy.SUBJECT_TYPE);
Subject.newInstance(TestConstants.Policy.SUBJECT_ISSUER, TestConstants.Policy.SUBJECT_ID_SUBJECT,
TestConstants.Policy.SUBJECT_TYPE);

assertThat(underTest).isNotNull();
assertThat(underTest.getId()).isEqualTo(TestConstants.Policy.SUBJECT_ID);
Expand All @@ -132,7 +139,8 @@ public void createSubjectWithIssuerSuccess() {
@Test
public void subjectWithIssuerEqualsSubjectAndIssuer() {
final Subject subjectAndIssuer =
Subject.newInstance(TestConstants.Policy.SUBJECT_ISSUER, TestConstants.Policy.SUBJECT_ID_SUBJECT, TestConstants.Policy.SUBJECT_TYPE);
Subject.newInstance(TestConstants.Policy.SUBJECT_ISSUER, TestConstants.Policy.SUBJECT_ID_SUBJECT,
TestConstants.Policy.SUBJECT_TYPE);
final Subject subjectWithIssuer = Subject.newInstance(
TestConstants.Policy.SUBJECT_ID, TestConstants.Policy.SUBJECT_TYPE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ protected SubjectAnnouncement roundSubjectAnnouncement(@Nullable final SubjectAn
final var roundedUpDuration =
roundUpDuration(dittoDuration.getDuration(), policyDeletionAnnouncementGranularity);
final var roundedUpDittoDuration = dittoDuration.setAmount(roundedUpDuration);
return SubjectAnnouncement.of(roundedUpDittoDuration, subjectAnnouncement.isWhenDeleted());
return subjectAnnouncement.setBeforeExpiry(roundedUpDittoDuration);
} else {
return subjectAnnouncement;
}
Expand Down

0 comments on commit 4f77e5b

Please sign in to comment.